scanoperator.c 131.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

D
dapan1121 已提交
36
#define MULTI_READER_MAX_TABLE_NUM 5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
39

H
Haojun Liao 已提交
40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
49
  SSDataBlock*   inputBlock;  
D
dapan1121 已提交
50
  bool           multiReader;
51
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
52 53
} STableMergeScanSortSourceParam;

54 55 56 57 58 59 60 61 62 63
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
64
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
65

H
Haojun Liao 已提交
66
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
67 68 69 70 71 72 73 74 75 76 77 78
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

79
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
80 81 82 83 84
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

85 86 87 88 89 90 91 92 93
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
94
  // convert key to second
95 96 97 98 99 100 101
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
102
  time_t    t = (time_t)key;
103 104 105 106 107
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
108
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
109 110 111 112

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
113
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
114 115 116 117

  tw->ekey -= 1;
}

118
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
119 120 121 122 123 124 125
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

126
  if (order == TSDB_ORDER_ASC) {
127
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
128
    ASSERT(w.ekey >= pBlockInfo->window.skey);
129

130
    if (w.ekey < pBlockInfo->window.ekey) {
131 132 133
      return true;
    }

134 135
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

140
      ASSERT(w.ekey > pBlockInfo->window.ekey);
141
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
142 143 144 145
        return true;
      }
    }
  } else {
146
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
147
    ASSERT(w.skey <= pBlockInfo->window.ekey);
148

149
    if (w.skey > pBlockInfo->window.skey) {
150 151 152
      return true;
    }

153
    while (1) {
154 155 156 157 158 159
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
160
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
161 162 163
        return true;
      }
    }
164 165 166 167 168
  }

  return false;
}

169 170 171 172 173 174 175 176 177 178 179
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
180 181
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
182 183 184 185 186

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
187
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
188 189 190
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
191

192 193 194 195 196 197
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
198
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
199 200 201
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
202
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
203 204

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
205
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
206 207 208 209 210 211 212 213 214

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
215
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
216 217 218 219 220 221 222 223 224

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
225
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
226 227 228 229 230 231 232 233

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
234
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
235
                               int32_t numOfRows) {
H
Haojun Liao 已提交
236
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
237 238 239
    return true;
  }

H
Haojun Liao 已提交
240
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
241 242 243
  return keep;
}

H
Haojun Liao 已提交
244
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
245
  bool    allColumnsHaveAgg = true;
246
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
247
  if (code != TSDB_CODE_SUCCESS) {
248
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
249 250 251 252 253 254 255 256
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
257
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
258
                               int32_t rows) {
H
Haojun Liao 已提交
259 260 261
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

262
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
263
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
264
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
265
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
266 267
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
268 269 270

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
271 272 273
  }
}

274
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
275
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
276
  const char* id = GET_TASKID(pTaskInfo);
277

278
  if (pLimitInfo->remainOffset > 0) {
279 280
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
281
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
282
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
283
      return false;
284
    } else {
285
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
286 287 288 289 290 291
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
292
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
293
    blockDataKeepFirstNRows(pBlock, keep);
294 295

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
296
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
297
    return true;
298
  }
299

300
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
301
  return false;
302 303
}

H
Haojun Liao 已提交
304
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
305
                             uint32_t* status) {
S
slzhou 已提交
306
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
307
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
308 309

  pCost->totalBlocks += 1;
310
  pCost->totalRows += pBlock->info.rows;
311

H
Haojun Liao 已提交
312
  bool loadSMA = false;
H
Haojun Liao 已提交
313
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
314
  if (pOperator->exprSupp.pFilterInfo != NULL ||
315
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
316 317 318 319
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
320
  taosMemoryFreeClear(pBlock->pBlockAgg);
321 322

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
D
dapan1121 已提交
323
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
324
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
325
    pCost->filterOutBlocks += 1;
326
    pCost->totalRows += pBlock->info.rows;
327
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
328 329
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
D
dapan1121 已提交
330
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo),
H
Haojun Liao 已提交
331
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
332
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
333
    pCost->skipBlocks += 1;
334
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
335
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
336
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
337
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
338
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
339
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
340
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
D
dapan1121 已提交
341
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
342
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
343
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
344
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
345 346
      return TSDB_CODE_SUCCESS;
    } else {
347
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
348
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
349
    }
H
Haojun Liao 已提交
350
  }
351

H
Haojun Liao 已提交
352
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
353

H
Haojun Liao 已提交
354
  // try to filter data block according to sma info
H
Haojun Liao 已提交
355
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
356 357 358
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
359
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
360
      if (!keep) {
D
dapan1121 已提交
361
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
362 363 364 365
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

366
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
367 368
        return TSDB_CODE_SUCCESS;
      }
369
    }
H
Haojun Liao 已提交
370
  }
371

372 373 374
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

375
  // try to filter data block according to current results
376 377
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
D
dapan1121 已提交
378
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
379 380
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;
381
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
382
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
383 384 385
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
386 387
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
388

H
Haojun Liao 已提交
389 390
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
391
    return terrno;
H
Haojun Liao 已提交
392 393
  }

H
Haojun Liao 已提交
394
  ASSERT(p == pBlock);
395
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
396

H
Haojun Liao 已提交
397 398
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
399

H
Haojun Liao 已提交
400
  if (pOperator->exprSupp.pFilterInfo != NULL) {
401
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
402
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
403

404 405
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
406

407 408
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
409
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
410 411 412 413
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
414 415
  }

416
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
417
  if (limitReached) {  // set operator flag is done
418 419
    setOperatorCompleted(pOperator);
  }
420

H
Haojun Liao 已提交
421
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
422 423 424
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
425
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
426 427 428
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
429
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
430 431
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
432 433
}

434 435
typedef struct STableCachedVal {
  const char* pName;
436
  STag*       pTags;
437 438
} STableCachedVal;

439 440 441 442 443 444 445 446 447 448 449
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
450 451
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
452
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
453 454 455 456
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
457
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
458 459 460 461 462 463 464
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

465 466
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
467

468 469 470 471 472
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
473
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
474 475 476
  }
}

477 478
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
479
  // currently only the tbname pseudo column
480
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
481
    return TSDB_CODE_SUCCESS;
482 483
  }

484 485
  int32_t code = 0;

486 487 488 489
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

490
  bool            freeReader = false;
491
  STableCachedVal val = {0};
492 493

  SMetaReader mr = {0};
494
  LRUHandle*  h = NULL;
495

496 497 498
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

499
  // 1. check if it is existed in meta cache
500
  if (pCache == NULL) {
501
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
502
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
503
    if (code != TSDB_CODE_SUCCESS) {
504
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
505
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
506 507
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
508 509 510

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
511
      } else {
S
slzhou 已提交
512 513
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
514
      }
515 516 517 518 519
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
520

521 522
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
523 524

    freeReader = true;
525
  } else {
526 527
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
528
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
529 530
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
531
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
532
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
533
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
534
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
535
                pBlock->info.id.uid, tstrerror(terrno), idStr);
536 537
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
538
        } else {
H
Haojun Liao 已提交
539
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
540
                 idStr);
H
Haojun Liao 已提交
541
        }
542 543 544 545 546 547
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
548
      STableCachedVal* pVal = createTableCacheVal(&mr);
549

H
Haojun Liao 已提交
550
      val = *pVal;
551
      freeReader = true;
H
Haojun Liao 已提交
552

H
Haojun Liao 已提交
553
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
554
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
555 556 557 558 559 560 561 562
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
563

H
Haojun Liao 已提交
564
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
565
    }
H
Haojun Liao 已提交
566

567 568
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
569
  }
570

571 572
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
573
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
574 575

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
576
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
577

578
    int32_t functionId = pExpr1->pExpr->_function.functionId;
579 580 581

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
582
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
583
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
584
      STagVal tagVal = {0};
585 586
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
587

588 589 590 591
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
592
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
593
      }
594

H
Haojun Liao 已提交
595 596
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
597
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
598
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
599
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
600 601 602
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
603 604 605 606 607 608
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
609
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
610
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
611
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
612
        }
613 614 615 616
      }
    }
  }

617 618
  // restore the rows
  pBlock->info.rows = backupRows;
619 620 621 622
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
623
  return TSDB_CODE_SUCCESS;
624 625
}

H
Haojun Liao 已提交
626
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
627 628 629
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
630
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
631
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
632 633 634
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
635

H
Haojun Liao 已提交
636
  colInfoDataEnsureCapacity(&infoData, 1, false);
637
  colDataSetVal(&infoData, 0, buf, false);
638

H
Haojun Liao 已提交
639
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
640
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
641 642 643 644 645 646 647

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
648
  colDataDestroy(&infoData);
649 650
}

651
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
652
  STableScanInfo* pTableScanInfo = pOperator->info;
653
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
654
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
655 656
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
657

658 659
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
660 661 662 663 664 665 666 667 668 669 670
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
    
671
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
672
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
673
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
674
    }
H
Haojun Liao 已提交
675

676
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
677
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
678 679 680
      break;
    }

681 682 683 684 685 686
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
687
    if (pBlock->info.id.uid) {
688
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
689 690
    }
    
691
    uint32_t status = 0;
H
Haojun Liao 已提交
692
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
693
    if (code != TSDB_CODE_SUCCESS) {
694
      T_LONG_JMP(pTaskInfo->env, code);
695
    }
696

697 698 699
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
700
    }
701

H
Haojun Liao 已提交
702 703
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
704

H
Haojun Liao 已提交
705
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
706 707

    // todo refactor
H
Haojun Liao 已提交
708
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
709 710
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
711
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
712
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
713

714
    return pBlock;
H
Haojun Liao 已提交
715 716 717 718
  }
  return NULL;
}

H
Haojun Liao 已提交
719
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
720 721 722 723
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
724
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
725 726 727
    return NULL;
  }

728 729
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
730 731 732
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
733 734
    }

735
    pTableScanInfo->scanTimes += 1;
736

737
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
738
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
739 740
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
741
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
742

743
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
744
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
745
    }
746
  }
H
Haojun Liao 已提交
747

748
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
749
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
750 751 752
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
753
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
754
    }
H
Haojun Liao 已提交
755

756
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
757 758 759
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
760
      }
H
Haojun Liao 已提交
761

762
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
763

764
      if (pTableScanInfo->scanTimes < total) {
765
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
766
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
767

768
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
769
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
770
      }
H
Haojun Liao 已提交
771 772 773
    }
  }

wmmhello's avatar
wmmhello 已提交
774 775 776 777 778 779 780
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

781
  // scan table one by one sequentially
L
Liu Jicong 已提交
782
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
783
    int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableListInfo);
784
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
785

L
Liu Jicong 已提交
786
    while (1) {
H
Haojun Liao 已提交
787
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
788
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
789 790
        return result;
      }
H
Haojun Liao 已提交
791

L
Liu Jicong 已提交
792 793
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
794 795

      taosRLockLatch(&pTaskInfo->lock);
796
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
797

H
Haojun Liao 已提交
798
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
799
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
800
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
801 802
        return NULL;
      }
H
Haojun Liao 已提交
803

804
      tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
805 806 807 808
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
809
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
810

H
Haojun Liao 已提交
811
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
812 813
      pInfo->scanTimes = 0;
    }
814 815
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
816
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
817
        setOperatorCompleted(pOperator);
818 819
        return NULL;
      }
820

5
54liuyao 已提交
821
      int32_t        num = 0;
822
      STableKeyInfo* pList = NULL;
823
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
824
      ASSERT(pInfo->base.dataReader == NULL);
825

L
Liu Jicong 已提交
826
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
827
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
828 829 830
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
831 832 833 834

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
835
    }
H
Haojun Liao 已提交
836

H
Haojun Liao 已提交
837
    SSDataBlock* result = doGroupedTableScan(pOperator);
838 839 840
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
841

842
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
843
      setOperatorCompleted(pOperator);
844 845
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
846

847 848
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
849
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
850

5
54liuyao 已提交
851
    int32_t        num = 0;
852
    STableKeyInfo* pList = NULL;
853
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
854

H
Haojun Liao 已提交
855 856
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
857
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
858

H
Haojun Liao 已提交
859
    result = doGroupedTableScan(pOperator);
860 861 862
    if (result != NULL) {
      return result;
    }
863

H
Haojun Liao 已提交
864
    setOperatorCompleted(pOperator);
865 866
    return NULL;
  }
H
Haojun Liao 已提交
867 868
}

869 870
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
871
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
872
  *pRecorder = pTableScanInfo->base.readRecorder;
873 874 875 876 877
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

878 879
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
880

881 882
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
883

884 885
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
886
  }
L
Liu Jicong 已提交
887

888
  tableListDestroy(pBase->pTableListInfo);
889 890 891 892 893 894 895 896
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
897
  taosMemoryFreeClear(param);
898 899
}

900
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
901
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
902 903 904
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
905
    goto _error;
H
Haojun Liao 已提交
906 907
  }

908
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
909
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
910 911

  int32_t numOfCols = 0;
912
  int32_t code =
H
Haojun Liao 已提交
913
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
914 915 916 917
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
918
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
919
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
920
  if (code != TSDB_CODE_SUCCESS) {
921
    goto _error;
922 923
  }

H
Haojun Liao 已提交
924
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
925
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
926
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
927
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
928 929
  }

930
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
931
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
932

H
Haojun Liao 已提交
933 934
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
935 936
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

937 938
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
939

H
Haojun Liao 已提交
940
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
941
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
942
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
943

H
Haojun Liao 已提交
944 945 946
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
947 948
  }

wmmhello's avatar
wmmhello 已提交
949
  pInfo->currentGroupId = -1;
950
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
951
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
952

L
Liu Jicong 已提交
953 954
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
955
  pOperator->exprSupp.numOfExprs = numOfCols;
956

957
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
958 959
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
960 961 962
    code = terrno;
    goto _error;
  }
963

D
dapan1121 已提交
964 965 966 967
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
968
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
969 970
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
971 972 973

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
974
  return pOperator;
975

976
_error:
977 978 979
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
980

981 982
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
983
  return NULL;
H
Haojun Liao 已提交
984 985
}

986
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
987
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
988
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
989

H
Haojun Liao 已提交
990
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
991
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
992

L
Liu Jicong 已提交
993 994
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
995
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
996 997 998
  return pOperator;
}

999
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1000
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1001 1002
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1003 1004
}

1005
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1006
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1007 1008
}

1009
static bool isStateWindow(SStreamScanInfo* pInfo) {
1010
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1011
}
5
54liuyao 已提交
1012

L
Liu Jicong 已提交
1013
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1014 1015 1016
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1017 1018 1019
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1020
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1021 1022
}

1023 1024 1025 1026
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1027
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1028 1029
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1030
  ASSERT(rowIndex < pBlock->info.rows);
1031
  pInfo->groupId = groupCol[rowIndex];
1032 1033
}

L
Liu Jicong 已提交
1034
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
1035
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
1036 1037
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1038
  tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1039
  qDebug("1");
H
Haojun Liao 已提交
1040
  pTableScanInfo->base.dataReader = NULL;
1041 1042
}

L
Liu Jicong 已提交
1043 1044
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1045
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1046

1047
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1048
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1049 1050 1051 1052 1053 1054 1055 1056 1057

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1058
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1059
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1060 1061
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1062
    T_LONG_JMP(pTaskInfo->env, code);
1063 1064 1065
    return NULL;
  }

D
dapan1121 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    T_LONG_JMP(pTaskInfo->env, code);
    return NULL;
  }

  if (hasNext) {
L
Liu Jicong 已提交
1075
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1076
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1077
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1078 1079 1080
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1081
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1082 1083
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1084 1085

  return pBlock->info.rows > 0 ? pBlock : NULL;
1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1097
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1098
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1099
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1100 1101
}

5
54liuyao 已提交
1102 1103 1104 1105 1106 1107 1108 1109
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1110
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1111 1112 1113
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1124 1125 1126
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1127 1128 1129 1130 1131 1132

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1133
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1134 1135 1136 1137
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1138 1139 1140
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1141
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1142 1143 1144
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1145
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1146 1147 1148
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1149 1150
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1151 1152 1153 1154
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1155
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1156 1157 1158
  return true;
}

5
54liuyao 已提交
1159
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1160
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1161
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1162
  dumyInfo.cur.pageId = -1;
1163
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1164 1165
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1166
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1167

5
54liuyao 已提交
1168
  while (1) {
1169 1170 1171
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1172
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1173 1174 1175 1176 1177
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1178
    }
5
54liuyao 已提交
1179

5
54liuyao 已提交
1180 1181 1182
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1183
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1184
    endWin = preWin;
5
54liuyao 已提交
1185
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1186 1187 1188 1189 1190 1191
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1192

L
Liu Jicong 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1204
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1205
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1206
      tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1207
      qDebug("2");
H
Haojun Liao 已提交
1208
      pTableScanInfo->base.dataReader = NULL;
1209 1210
      return NULL;
    }
L
Liu Jicong 已提交
1211

H
Haojun Liao 已提交
1212
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1213 1214 1215 1216
    if (pResult->info.rows == 0) {
      continue;
    }

1217 1218 1219 1220 1221 1222 1223 1224
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1225 1226
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1227
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1228 1229 1230 1231
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1232 1233 1234

      blockDataDestroy(tmpBlock);

1235 1236 1237 1238
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1239
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1240
      pResult->info.calWin = pInfo->updateWin;
1241
      return pResult;
5
54liuyao 已提交
1242 1243
    }
  }
1244
}
1245

1246
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1247
                                   SSessionKey* pKey) {
1248 1249 1250
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1251

1252 1253 1254 1255 1256
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
H
Haojun Liao 已提交
1257 1258

  taosMemoryFree(pCur);
1259 1260 1261
  return code;
}

1262
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1263
  blockDataCleanup(pDestBlock);
1264 1265
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1266
  }
1267
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1268
  if (code != TSDB_CODE_SUCCESS) {
1269
    return code;
L
Liu Jicong 已提交
1270
  }
1271 1272
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1273
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1274
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1275
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1276 1277
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1278

1279 1280
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1281
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1282
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1283 1284
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1285
  int64_t          version = pSrcBlock->info.version - 1;
1286
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1287
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1288
    // gap must be 0.
5
54liuyao 已提交
1289
    SSessionKey startWin = {0};
1290
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1291
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1292 1293 1294
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1295 1296
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1297
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1298 1299 1300 1301
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1302
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1303 1304
      continue;
    }
1305 1306
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1307

1308
    colDataSetNULL(pDestUidCol, i);
1309
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1310 1311
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1312
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1313
  }
1314
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1315
}
1316 1317 1318 1319 1320 1321

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1322
  }
1323

1324 1325
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1326 1327
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1328

L
Liu Jicong 已提交
1329
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1330
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1367 1368
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1369
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1370 1371 1372
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1373
  for (int32_t i = 0; i < rows;) {
1374
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1375 1376 1377 1378 1379
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1380
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1381
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1382 1383
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1384 1385 1386 1387 1388
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1389
    pDestBlock->info.rows++;
5
54liuyao 已提交
1390
  }
1391 1392
  return TSDB_CODE_SUCCESS;
}
1393

1394
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1395 1396 1397
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1398 1399
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1400
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1401 1402 1403 1404
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1415
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1416 1417
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1418
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1419 1420 1421
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1422
    if (pInfo->tbnameCalSup.pExprInfo) {
1423 1424 1425
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1426 1427
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1428
      tdbFree(parTbname);
L
Liu Jicong 已提交
1429 1430 1431
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1432 1433 1434 1435
  }
  return TSDB_CODE_SUCCESS;
}

1436 1437 1438 1439
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1440
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1441
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1442 1443
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1444
  }
1445
  pDestBlock->info.type = STREAM_CLEAR;
1446
  pDestBlock->info.version = pSrcBlock->info.version;
1447
  pDestBlock->info.dataLoad = 1;
1448 1449 1450 1451
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1452 1453 1454
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SExprSupp*    pTagCalSup = &pInfo->tagCalSup;
1455
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1456
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
L
Liu Jicong 已提交
1457
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1458

L
Liu Jicong 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
  void*   tag = NULL;
  int32_t tagLen = 0;
  if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
    pBlock->info.tagLen = tagLen;
    void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
    if (pTag == NULL) {
      tdbFree(tag);
      taosMemoryFree(pBlock->info.pTag);
      pBlock->info.pTag = NULL;
      pBlock->info.tagLen = 0;
      return;
    }
    pBlock->info.pTag = pTag;
    memcpy(pBlock->info.pTag, tag, tagLen);
    tdbFree(tag);
    return;
L
Liu Jicong 已提交
1475
  } else {
L
Liu Jicong 已提交
1476
    pBlock->info.pTag = NULL;
L
Liu Jicong 已提交
1477
  }
L
Liu Jicong 已提交
1478 1479 1480
  tdbFree(tag);
}
#endif
L
Liu Jicong 已提交
1481

5
54liuyao 已提交
1482
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1483 1484
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
5
54liuyao 已提交
1485 1486
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1487
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1488
  } else {
5
54liuyao 已提交
1489 1490
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1491
  }
L
Liu Jicong 已提交
1492 1493
}

1494 1495
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1496 1497
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1498 1499
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1500 1501
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1502
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1503 1504 1505 1506 1507 1508 1509
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1510
  pBlock->info.rows++;
5
54liuyao 已提交
1511 1512
}

1513
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1514 1515
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1516
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1517
  }
1518 1519
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1520
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1521
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1522
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1523 1524
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1525
    bool        isClosed = false;
5
54liuyao 已提交
1526
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1527
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1528 1529 1530 1531 1532
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1533 1534 1535
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1536
    // must check update info first.
H
Haojun Liao 已提交
1537
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1538
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1539
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1540
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1541
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1542
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1543
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1544
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1545
                                       NULL);
5
54liuyao 已提交
1546 1547
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1548 1549
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1550
      }
1551 1552
    }
  }
1553 1554
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1555
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1556
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1557
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1558 1559
  }
}
L
Liu Jicong 已提交
1560

1561
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1562 1563
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1564
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1565

1566 1567
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1568
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1569
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1570
  pInfo->pRes->info.type = STREAM_NORMAL;
1571
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1572

1573
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1574
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1575 1576

  // todo extract method
H
Haojun Liao 已提交
1577 1578 1579
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1580 1581 1582 1583 1584 1585 1586
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1587
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1588
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1589 1590 1591 1592 1593 1594 1595
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1596
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1597
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1598 1599 1600 1601 1602
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1603
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1604
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1605 1606
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1607
      blockDataFreeRes((SSDataBlock*)pBlock);
1608
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1609
    }
K
kailixu 已提交
1610 1611 1612

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1613 1614
  }

1615
  if (filter) {
H
Haojun Liao 已提交
1616
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1617
  }
1618

1619
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1620
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1621
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1622

L
Liu Jicong 已提交
1623
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1624 1625
  return 0;
}
5
54liuyao 已提交
1626

L
Liu Jicong 已提交
1627
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1628 1629
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
1630
  const char* id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1631

1632
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1633

L
Liu Jicong 已提交
1634
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1635

1636
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1637
      SPackedData submit = pTaskInfo->streamInfo.submit;
L
Liu Jicong 已提交
1638
      if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
1639
        qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id);
L
Liu Jicong 已提交
1640
        pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1641
        pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1642 1643 1644 1645 1646 1647 1648
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

L
Liu Jicong 已提交
1649
    while (tqNextDataBlock2(pInfo->tqReader)) {
L
Liu Jicong 已提交
1650 1651
      SSDataBlock block = {0};

1652
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1653 1654 1655 1656
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1657
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1658 1659 1660 1661 1662 1663

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1664
    pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1665
    pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1666
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1667
    return NULL;
L
Liu Jicong 已提交
1668 1669
  }

L
Liu Jicong 已提交
1670 1671 1672
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
D
dapan1121 已提交
1673
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
1674
             pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
1675
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1676 1677
      return pResult;
    } else {
1678
      // no data has return already, try to extract data in the WAL
1679 1680
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1681
        tsdbReaderClose(pTSInfo->base.dataReader);
1682

H
Haojun Liao 已提交
1683
        pTSInfo->base.dataReader = NULL;
1684
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1685 1686

        qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id);
H
Haojun Liao 已提交
1687
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1688
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1689 1690 1691
          return NULL;
        }
      } else {
L
Liu Jicong 已提交
1692 1693
        return NULL;
      }
1694 1695 1696
    }
  }

L
Liu Jicong 已提交
1697 1698 1699
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
H
Haojun Liao 已提交
1700 1701
      terrno = 0;

1702
      if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
1703 1704
        // if the end is reached, terrno is 0
        if (terrno != 0) {
1705
          qError("failed to get next log block since %s, %s", terrstr(), id);
1706
        }
1707
      }
1708

L
Liu Jicong 已提交
1709 1710
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1711
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1712
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1713
          pOperator->status = OP_EXEC_RECV;
D
dapan1121 已提交
1714
          qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1715 1716 1717
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
1718
        qError("unexpected ret.fetchType:%d", ret.fetchType);
1719
        continue;
L
Liu Jicong 已提交
1720 1721
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1722
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1723 1724
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1725
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1726
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1727 1728 1729
        return NULL;
      }
    }
L
Liu Jicong 已提交
1730
  } else {
1731
    qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id);
L
Liu Jicong 已提交
1732
    return NULL;
H
Haojun Liao 已提交
1733
  }
L
Liu Jicong 已提交
1734 1735
}

L
Liu Jicong 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1754 1755 1756
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1757

1758 1759 1760
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1761 1762 1763
      j++;
    }
  }
L
Liu Jicong 已提交
1764
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1765 1766
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1767
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1768 1769 1770 1771

  return 0;
}

5
54liuyao 已提交
1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1784
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1785 1786 1787 1788
    }
  }
}

5
54liuyao 已提交
1789
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1790
  if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1791
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1803 1804 1805 1806
    }
  }
}

L
Liu Jicong 已提交
1807 1808 1809 1810 1811
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1812
  qDebug("stream scan called");
H
Haojun Liao 已提交
1813

1814 1815
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1816
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1817
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1818
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1819 1820 1821 1822
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1823
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1824
    } else {
H
Haojun Liao 已提交
1825 1826 1827 1828
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1829
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1830
    }
L
Liu Jicong 已提交
1831 1832

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1833
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1834 1835
    qDebug("4");

H
Haojun Liao 已提交
1836
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1837
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1838

L
Liu Jicong 已提交
1839 1840
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1841
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1842 1843
  }

5
54liuyao 已提交
1844 1845
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1846 1847 1848 1849 1850
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1851 1852 1853 1854 1855 1856 1857

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1858 1859 1860 1861
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1862
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1863 1864
        return pInfo->pUpdateRes;
      } break;
1865 1866 1867 1868 1869 1870 1871 1872 1873
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1874 1875 1876 1877 1878 1879 1880 1881
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1882
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1883 1884 1885 1886 1887 1888
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1889 1890 1891 1892 1893 1894
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1895
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1896
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1897
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1898 1899 1900 1901 1902 1903
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1904
      }
5
54liuyao 已提交
1905 1906
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1907
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1908 1909
        return pInfo->pCreateTbRes;
      }
D
dapan1121 已提交
1910
      qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1911 1912
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1913 1914
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1915
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1916
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1917 1918
    qDebug("5");

H
Haojun Liao 已提交
1919
    pTSInfo->base.dataReader = NULL;
1920

H
Haojun Liao 已提交
1921 1922
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1923

L
Liu Jicong 已提交
1924
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1925 1926 1927
    return NULL;
  }

5
54liuyao 已提交
1928
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1929
// TODO: refactor
L
Liu Jicong 已提交
1930
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1931
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1932
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1933
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1934
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1935 1936 1937
      return NULL;
    }

1938
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1939 1940
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1941
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1942
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1943
    }
1944
    // TODO move into scan
5
54liuyao 已提交
1945 1946
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1947
    pBlock->info.dataLoad = 1;
1948
    blockDataUpdateTsWindow(pBlock, 0);
1949
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1950 1951 1952
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1953 1954 1955
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1956 1957
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1958 1959 1960
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1961
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1962
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1963
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1964
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1965
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1966 1967
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1968
        }
5
54liuyao 已提交
1969 1970
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1971 1972 1973 1974 1975 1976
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1977
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1978
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1979
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1980
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1981 1982
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1983 1984 1985 1986 1987
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1988 1989 1990
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1991
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1992 1993 1994
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1995 1996 1997 1998
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1999
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
2000
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
2001 2002 2003 2004
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
2005
        }
2006 2007 2008
      } break;
      default:
        break;
5
54liuyao 已提交
2009
    }
2010
    // printDataBlock(pBlock, "stream scan recv");
2011
    return pBlock;
L
Liu Jicong 已提交
2012
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
2013
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
2014 2015 2016
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
2017
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
2018 2019 2020
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2021 2022 2023
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2024
      } break;
2025
      case STREAM_SCAN_FROM_DELETE_DATA: {
2026 2027 2028 2029 2030 2031 2032
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2043
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
2044 2045
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
2046 2047
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
2048
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2049
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2050 2051
          return pSDB;
        }
2052
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2053 2054 2055 2056
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2057
    }
2058

2059
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2060
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2061 2062
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2063 2064
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2065
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2066
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2067
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2068
    }
5
54liuyao 已提交
2069

H
Haojun Liao 已提交
2070 2071
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2072
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2073

L
Liu Jicong 已提交
2074
  NEXT_SUBMIT_BLK:
2075
    while (1) {
L
Liu Jicong 已提交
2076
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2077
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2078
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2079
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2080
          qDebug("stream scan return empty, consume block %d", totBlockNum);
2081 2082
          return NULL;
        }
2083

L
Liu Jicong 已提交
2084 2085
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
L
Liu Jicong 已提交
2086
        /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
L
Liu Jicong 已提交
2087
        if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2088 2089 2090 2091
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2092 2093
      }

2094 2095
      blockDataCleanup(pInfo->pRes);

L
Liu Jicong 已提交
2096
      while (tqNextDataBlock2(pInfo->tqReader)) {
2097
        SSDataBlock block = {0};
2098

2099
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2100 2101 2102 2103 2104

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2105
        setBlockIntoRes(pInfo, &block, false);
2106

H
Haojun Liao 已提交
2107
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
2108
                             pInfo->pRes->info.version)) {
2109 2110 2111 2112 2113
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

5
54liuyao 已提交
2114 2115 2116
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2117 2118
        }

5
54liuyao 已提交
2119
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2120
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2121
        pInfo->pRes->info.dataLoad = 1;
2122 2123 2124
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2125 2126 2127
          break;
        }
      }
2128
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2129
        break;
J
jiacy-jcy 已提交
2130
      } else {
2131
        continue;
5
54liuyao 已提交
2132
      }
H
Haojun Liao 已提交
2133 2134 2135 2136
    }

    // record the scan action.
    pInfo->numOfExec++;
2137
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2138
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2139

D
dapan1121 已提交
2140
    qDebug("scan rows: %" PRId64 , pBlockInfo->rows);
L
Liu Jicong 已提交
2141 2142 2143
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2144 2145 2146 2147 2148 2149

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2150 2151 2152
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2153 2154 2155
  }
}

H
Haojun Liao 已提交
2156
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2157 2158 2159
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2160 2161 2162
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2163 2164 2165 2166 2167 2168
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2169
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2170 2171
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2172
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2173
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2174
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2175
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2176

wmmhello's avatar
wmmhello 已提交
2177
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2178
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2179 2180 2181 2182 2183
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2184
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2185 2186 2187 2188
      }
    }
    
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2189
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2190
        tsdbReleaseDataBlock(pInfo->dataReader);
2191
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2192
      }
2193

H
Haojun Liao 已提交
2194 2195
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2196
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2197 2198
      }

H
Haojun Liao 已提交
2199
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2200
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2201
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2202 2203 2204
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2205 2206

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2207
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2208 2209
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2210 2211
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2212
    } else {
wmmhello's avatar
wmmhello 已提交
2213 2214
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2215
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2216 2217
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2218
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2219
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2220
    return NULL;
L
Liu Jicong 已提交
2221 2222 2223 2224 2225 2226 2227
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2228
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2229
      taosMemoryFreeClear(data);
2230 2231 2232
      return NULL;
    }

L
Liu Jicong 已提交
2233
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2234 2235 2236 2237
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2238
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2239
    } else {
wmmhello's avatar
wmmhello 已提交
2240 2241 2242 2243 2244 2245 2246
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2247

wmmhello's avatar
wmmhello 已提交
2248
    return NULL;
2249
  }
L
Liu Jicong 已提交
2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2288 2289 2290
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2291
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2292 2293 2294
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2295
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2296 2297 2298
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2299 2300 2301
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2302
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2303 2304 2305 2306 2307
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2308 2309
  int32_t code = TSDB_CODE_SUCCESS;

2310
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2311
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2312
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2313 2314
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2315 2316
  }

2317
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2318 2319
  pInfo->vnode = pHandle->vnode;

2320
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2321 2322
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2323

2324
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2325
  return pOperator;
H
Haojun Liao 已提交
2326

L
Liu Jicong 已提交
2327
_end:
H
Haojun Liao 已提交
2328 2329 2330 2331
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2332 2333
}

2334
static void destroyStreamScanOperatorInfo(void* param) {
2335
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2336

2337
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2338
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2339
  }
2340

2341 2342 2343
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2344 2345
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2346
  }
C
Cary Xu 已提交
2347 2348
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2349
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2350
  }
C
Cary Xu 已提交
2351

L
Liu Jicong 已提交
2352
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2353
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2354

L
Liu Jicong 已提交
2355
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2356 2357 2358 2359
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2360
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2361
  blockDataDestroy(pStreamScan->pCreateTbRes);
2362 2363 2364 2365
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2366
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2367
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2368
  SArray*          pColIds = NULL;
2369 2370
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2371

H
Haojun Liao 已提交
2372
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2373
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2374
    tableListDestroy(pTableListInfo);
2375
    goto _error;
H
Haojun Liao 已提交
2376 2377
  }

2378
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2379
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2380

2381
  pInfo->pTagCond = pTagCond;
2382
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2383

2384
  int32_t numOfCols = 0;
2385 2386
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2387
  if (code != TSDB_CODE_SUCCESS) {
2388
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2389 2390
    goto _error;
  }
2391

H
Haojun Liao 已提交
2392
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2393
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2394
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2395
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2396 2397

    int16_t colId = id->colId;
2398
    taosArrayPush(pColIds, &colId);
2399
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2400
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2401
    }
H
Haojun Liao 已提交
2402 2403
  }

L
Liu Jicong 已提交
2404 2405 2406 2407
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2408
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2409 2410
      goto _error;
    }
2411

L
Liu Jicong 已提交
2412 2413 2414
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2415
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2416 2417 2418 2419
      goto _error;
    }
  }

2420 2421
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2422
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2423 2424
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2425
      tableListDestroy(pTableListInfo);
2426 2427 2428 2429
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2430
      tableListDestroy(pTableListInfo);
2431 2432 2433 2434
      goto _error;
    }
  }

L
Liu Jicong 已提交
2435
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2436
  if (pInfo->pBlockLists == NULL) {
2437
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2438
    tableListDestroy(pTableListInfo);
2439
    goto _error;
H
Haojun Liao 已提交
2440 2441
  }

5
54liuyao 已提交
2442
  if (pHandle->vnode) {
2443
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2444
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2445
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2446
      pTSInfo->base.cond.endVersion = pHandle->version;
2447
    }
L
Liu Jicong 已提交
2448

2449
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2450
    int32_t        num = 0;
2451
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2452

2453
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2454
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2455
      pTSInfo->base.dataReader = NULL;
2456
      pTaskInfo->streamInfo.lastStatus.uid = -1;
L
Liu Jicong 已提交
2457 2458
    }

L
Liu Jicong 已提交
2459 2460 2461 2462
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2463
    } else {
L
Liu Jicong 已提交
2464 2465
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2466 2467
    }

2468
    pInfo->pUpdateInfo = NULL;
2469
    pInfo->pTableScanOp = pTableScanOp;
2470 2471 2472
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2473

L
Liu Jicong 已提交
2474 2475
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2476
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2477 2478
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2479

L
Liu Jicong 已提交
2480
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2481
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2482
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2483
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2484 2485 2486 2487
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2488

L
Liu Jicong 已提交
2489
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2490
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2491 2492
  } else {
    taosArrayDestroy(pColIds);
2493
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2494
    pColIds = NULL;
5
54liuyao 已提交
2495 2496
  }

2497 2498 2499 2500 2501
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2502 2503 2504 2505 2506
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2507
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2508
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2509
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2510
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2511
  pInfo->groupId = 0;
2512
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2513
  pInfo->pStreamScanOp = pOperator;
2514
  pInfo->deleteDataIndex = 0;
2515
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2516
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2517
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2518
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2519
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2520 2521
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2522
  pInfo->twAggSup.maxTs = INT64_MIN;
L
Liu Jicong 已提交
2523

L
Liu Jicong 已提交
2524 2525
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2526
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2527

2528
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2529 2530
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2531

H
Haojun Liao 已提交
2532
  return pOperator;
2533

L
Liu Jicong 已提交
2534
_error:
H
Haojun Liao 已提交
2535 2536 2537 2538 2539 2540 2541 2542
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2543 2544
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2545 2546
}

2547
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2548 2549 2550 2551
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2552 2553 2554
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2555
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2556
  SSDataBlock*  pRes = pInfo->pRes;
2557
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2558

2559
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2560
  if (size == 0) {
H
Haojun Liao 已提交
2561 2562 2563 2564
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2565 2566 2567
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2568
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2569

wmmhello's avatar
wmmhello 已提交
2570
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2571
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2572
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2573
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2574
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2575 2576
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2577
      metaReaderClear(&mr);
2578
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2579
    }
H
Haojun Liao 已提交
2580

2581
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2582 2583 2584 2585 2586
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2587
        colDataSetVal(pDst, count, str, false);
2588
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2589 2590
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2591
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2592

2593 2594 2595 2596
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2597 2598
          data = (char*)p;
        }
2599
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2600
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2601

2602 2603
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2604
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2605
        }
H
Haojun Liao 已提交
2606 2607 2608
      }
    }

2609
    count += 1;
wmmhello's avatar
wmmhello 已提交
2610
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2611
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2612 2613 2614
    }
  }

2615 2616
  metaReaderClear(&mr);

2617
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2618
  if (pOperator->status == OP_EXEC_DONE) {
2619
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2620 2621 2622
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2623
  pOperator->resultInfo.totalRows += count;
2624

2625
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2626 2627
}

2628
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2629 2630
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2631
  taosArrayDestroy(pInfo->matchInfo.pList);
2632
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2633
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2634 2635
}

2636
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo,
S
slzhou 已提交
2637
                                         SExecTaskInfo* pTaskInfo) {
2638
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2639 2640 2641 2642 2643
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2644 2645 2646 2647
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2648
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2649 2650 2651
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2652

H
Haojun Liao 已提交
2653 2654
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2655 2656 2657
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2658

2659
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2660
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2661 2662
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2663

L
Liu Jicong 已提交
2664 2665
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2666
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2667 2668
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2669 2670
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2671 2672

  return pOperator;
2673

2674
_error:
H
Haojun Liao 已提交
2675 2676 2677 2678 2679
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2680

dengyihao's avatar
dengyihao 已提交
2681
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2682 2683 2684 2685 2686 2687
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
2688
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2689

H
Haojun Liao 已提交
2690
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2691

L
Liu Jicong 已提交
2692
  int64_t      st = taosGetTimestampUs();
2693
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2694
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2695

D
dapan1121 已提交
2696
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2697
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
2698 2699 2700
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2701
  }
2702 2703
  
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2704
  STsdbReader* reader = pInfo->base.dataReader;
D
dapan1121 已提交
2705
  bool hasNext = false;
2706
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
  
H
Haojun Liao 已提交
2720
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2721
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2722
      pInfo->base.dataReader = NULL;
2723
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2724 2725 2726
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2727
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2728 2729 2730 2731
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2732
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2733 2734 2735 2736
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2737 2738

    uint32_t status = 0;
2739
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2740
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2741
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2742
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2743 2744 2745 2746 2747 2748 2749
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2750
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2751

H
Haojun Liao 已提交
2752
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2753
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2754

2755
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2756 2757 2758 2759
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2760
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2761 2762
    return pBlock;
  }
H
Haojun Liao 已提交
2763

D
dapan1121 已提交
2764 2765 2766 2767
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2768
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2769 2770 2771
  return NULL;
}

2772 2773 2774
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2775
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2776
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2777
      tsTargetSlotId = colInfo->dstSlotId;
2778 2779 2780
    }
  }

2781 2782 2783
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2784
  bi.slotId = tsTargetSlotId;
2785 2786 2787 2788 2789 2790 2791
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2792
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2793 2794 2795 2796 2797 2798 2799
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2800

2801
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2802 2803 2804
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2805
  {
2806
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2807
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2808
    for (; i < numOfTables; ++i) {
2809
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2810 2811 2812 2813 2814 2815
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2816

S
slzhou 已提交
2817 2818
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2819

H
Haojun Liao 已提交
2820
  pInfo->base.dataReader = NULL;
2821

2822 2823
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2824
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2825
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2826 2827
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2828

dengyihao's avatar
dengyihao 已提交
2829
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2830 2831 2832 2833 2834 2835

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2836 2837 2838
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2839
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2840
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2841 2842
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2843
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2844 2845

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2846
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2847
    taosArrayPush(pInfo->queryConds, &cond);
2848 2849
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2850
  for (int32_t i = 0; i < numOfTable; ++i) {
2851
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2852
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2853
    ps->param = param;
2854
    ps->onlyRef = true;
2855 2856 2857 2858 2859 2860
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2861
    T_LONG_JMP(pTaskInfo->env, terrno);
2862 2863
  }

2864 2865 2866 2867 2868 2869 2870
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2871
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2872

2873 2874 2875 2876 2877 2878 2879
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2880
  for (int32_t i = 0; i < numOfTable; ++i) {
2881 2882
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2883 2884
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2885
  }
2886 2887
  taosArrayClear(pInfo->sortSourceParams);

2888
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2889
  pInfo->pSortHandle = NULL;
2890

dengyihao's avatar
opt mem  
dengyihao 已提交
2891 2892 2893
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2894
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2895 2896 2897
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2898
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2899 2900 2901
  return TSDB_CODE_SUCCESS;
}

2902 2903
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2904 2905
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2906 2907 2908
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2909
  blockDataCleanup(pResBlock);
2910 2911

  while (1) {
2912
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2913 2914 2915 2916
    if (pTupleHandle == NULL) {
      break;
    }

2917 2918
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2919 2920 2921 2922
      break;
    }
  }

2923
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2924
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2925
         pInfo->limitInfo.numOfOutputRows);
2926

2927
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2940
    T_LONG_JMP(pTaskInfo->env, code);
2941
  }
2942

2943
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2944 2945
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2946

S
slzhou 已提交
2947
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2948
      setOperatorCompleted(pOperator);
2949 2950
      return NULL;
    }
S
slzhou 已提交
2951
    pInfo->tableStartIndex = 0;
2952
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2953 2954
    startGroupTableMergeScan(pOperator);
  }
2955

S
slzhou 已提交
2956 2957
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2958 2959 2960 2961
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2962 2963
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2964
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2965
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2966 2967 2968
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2969
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2970 2971
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2972
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2973 2974
        break;
      }
2975

S
slzhou 已提交
2976
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2977
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2978
      startGroupTableMergeScan(pOperator);
D
dapan1121 已提交
2979
      resetLimitInfoForNextGroup(&pInfo->limitInfo);      
S
slzhou 已提交
2980
    }
wmmhello's avatar
wmmhello 已提交
2981 2982
  }

2983 2984 2985
  return pBlock;
}

2986
void destroyTableMergeScanOperatorInfo(void* param) {
2987
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2988
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2989

dengyihao's avatar
dengyihao 已提交
2990 2991 2992
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2993 2994
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2995 2996
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2997
  }
H
Haojun Liao 已提交
2998

2999
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
3000 3001
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
3002

dengyihao's avatar
opt mem  
dengyihao 已提交
3003 3004 3005
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
3006 3007
  }

3008 3009
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
3010 3011 3012 3013 3014

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
3015
  taosMemoryFreeClear(param);
3016 3017 3018 3019
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3020 3021
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3022
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3023
  execInfo->blockRecorder = pInfo->base.readRecorder;
3024
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3025 3026 3027

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3028

3029 3030 3031
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3032
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3033
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3034 3035 3036 3037 3038
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3039

3040 3041 3042
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3043
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3044
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3045 3046 3047
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3048

H
Haojun Liao 已提交
3049
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3050
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3051
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3052 3053 3054 3055
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3056
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3057 3058
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3059 3060 3061 3062
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3063 3064 3065 3066 3067 3068
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3069 3070
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3071
  pInfo->base.readHandle = *readHandle;
3072 3073 3074

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3075
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3076

3077
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3078
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3079 3080 3081 3082 3083 3084

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3085
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3086
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3087 3088
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3089
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3090

H
Haojun Liao 已提交
3091
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3092
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3093
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3094

dengyihao's avatar
dengyihao 已提交
3095
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3096 3097
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3098

L
Liu Jicong 已提交
3099 3100
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3101
  pOperator->exprSupp.numOfExprs = numOfCols;
3102

3103 3104
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3105 3106 3107 3108 3109 3110 3111 3112 3113
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3114 3115 3116 3117

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3118
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3119 3120 3121 3122 3123 3124
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3125 3126
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3127 3128 3129 3130 3131 3132 3133
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3195 3196
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3225

S
slzhou 已提交
3226
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3227 3228 3229
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3230
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3231
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3232
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3233 3234 3235 3236 3237 3238 3239 3240 3241

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3242
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3243 3244
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3245 3246 3247
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3248 3249 3250

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3251 3252
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3264 3265 3266
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3267
    ASSERT(strlen(dbName));
S
slzhou 已提交
3268
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3269 3270 3271 3272

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3273
    varDataSetLen(varDbName, strlen(dbName));
3274
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3275 3276 3277 3278
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3279
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3280
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3281
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3282
      varDataSetLen(varStbName, strlen(stbName));
3283
      colDataSetVal(colInfoData, 0, varStbName, false);
3284
    } else {
3285
      colDataSetNULL(colInfoData, 0);
3286
    }
S
slzhou 已提交
3287 3288 3289
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3290
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3291
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3292 3293 3294 3295
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3296
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3297 3298 3299
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3300
  size_t infodbTableNum;
S
slzhou 已提交
3301
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3302
  size_t perfdbTableNum;
S
slzhou 已提交
3303 3304
  getPerfDbMeta(NULL, &perfdbTableNum);

3305
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3306
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3307 3308
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3309
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3310 3311 3312 3313
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
3330 3331 3332 3333 3334 3335 3336
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
    
S
slzhou 已提交
3337 3338 3339
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
3340 3341 3342 3343 3344 3345 3346
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3347 3348 3349 3350 3351 3352 3353 3354
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3355
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3356 3357 3358 3359
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3360
  blockDataCleanup(pRes);
3361

S
slzhou 已提交
3362 3363 3364
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3365
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3366
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3367
  }
S
slzhou 已提交
3368

S
slzhou 已提交
3369 3370 3371 3372 3373
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3374 3375
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3376
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3377

S
slzhou 已提交
3378 3379 3380 3381 3382 3383
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

3384
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3399
      }
S
slzhou 已提交
3400 3401 3402 3403 3404 3405 3406 3407 3408 3409
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3410
    } else {
S
slzhou 已提交
3411
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3412 3413
    }
  } else {
S
slzhou 已提交
3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3431 3432 3433
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3434
    }
S
slzhou 已提交
3435 3436 3437
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3438
  }
S
slzhou 已提交
3439 3440 3441 3442 3443 3444
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3445 3446 3447 3448
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
  
S
slzhou 已提交
3449 3450 3451
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3452 3453 3454
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3455 3456 3457 3458 3459 3460 3461 3462
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3463 3464 3465 3466 3467 3468
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
  
S
slzhou 已提交
3469 3470 3471 3472 3473 3474 3475 3476
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3477 3478 3479
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3480
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3481 3482
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3483
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3484 3485
  taosMemoryFreeClear(param);
}