scanoperator.c 130.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
36
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
L
fix bug  
liuyao 已提交
39
#define STREAM_SCAN_OP_NAME          "StreamScanOperator"
40

H
Haojun Liao 已提交
41 42 43 44 45 46 47 48 49 50
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
51 52
  bool           multiReader;
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
53 54
} STableMergeScanSortSourceParam;

55 56 57 58 59 60 61 62 63 64
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
65
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
66

H
Haojun Liao 已提交
67
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
68 69 70 71 72 73 74 75 76 77 78 79
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

80
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
81 82 83 84 85
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

86 87 88 89 90 91 92 93 94
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
95
  // convert key to second
96 97 98 99 100 101 102
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
103
  time_t    t = (time_t)key;
104
  taosLocalTime(&t, &tm, NULL);
105 106 107 108

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
109
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
110 111 112 113

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
114
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
115 116 117 118

  tw->ekey -= 1;
}

119
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
120 121 122 123 124 125 126
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

127
  if (order == TSDB_ORDER_ASC) {
128
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
129
    ASSERT(w.ekey >= pBlockInfo->window.skey);
130

131
    if (w.ekey < pBlockInfo->window.ekey) {
132 133 134
      return true;
    }

135 136
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
137 138 139 140
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

141
      ASSERT(w.ekey > pBlockInfo->window.ekey);
142
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
143 144 145 146
        return true;
      }
    }
  } else {
147
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
148
    ASSERT(w.skey <= pBlockInfo->window.ekey);
149

150
    if (w.skey > pBlockInfo->window.skey) {
151 152 153
      return true;
    }

154
    while (1) {
155 156 157 158 159 160
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
161
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
162 163 164
        return true;
      }
    }
165 166 167 168 169
  }

  return false;
}

170 171 172 173 174 175 176 177 178 179 180
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
181 182
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
183 184 185 186 187

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
188
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
189 190 191
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
192

193 194 195 196 197 198
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
199
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
200 201 202
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
203
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
204 205

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
206
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
207 208 209 210 211 212 213 214 215

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
216
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
217 218 219 220 221 222 223 224 225

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
226
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
227 228 229 230 231 232 233 234

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
235
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
236
                               int32_t numOfRows) {
H
Haojun Liao 已提交
237
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
238 239 240
    return true;
  }

H
Haojun Liao 已提交
241
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
242 243 244
  return keep;
}

H
Haojun Liao 已提交
245
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
246
  bool    allColumnsHaveAgg = true;
247
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
248
  if (code != TSDB_CODE_SUCCESS) {
249
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
250 251 252 253 254 255 256 257
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
258
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
259
                               int32_t rows) {
H
Haojun Liao 已提交
260 261 262
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

263
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
264
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
265
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
266
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
267 268
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
269 270 271

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
272 273 274
  }
}

275
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
276
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
277
  const char* id = GET_TASKID(pTaskInfo);
278

279
  if (pLimitInfo->remainOffset > 0) {
280 281
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
282
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
283
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
284
      return false;
285
    } else {
286
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
287 288 289 290 291 292
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
293
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
294
    blockDataKeepFirstNRows(pBlock, keep);
295 296

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
297
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
298
    return true;
299
  }
300

301
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
302
  return false;
303 304
}

H
Haojun Liao 已提交
305
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
306
                             uint32_t* status) {
S
slzhou 已提交
307
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
308
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
309 310

  pCost->totalBlocks += 1;
311
  pCost->totalRows += pBlock->info.rows;
312

H
Haojun Liao 已提交
313
  bool loadSMA = false;
H
Haojun Liao 已提交
314
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
315
  if (pOperator->exprSupp.pFilterInfo != NULL ||
316
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
317 318 319 320
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
321
  taosMemoryFreeClear(pBlock->pBlockAgg);
322 323

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
324
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
325
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
326
    pCost->filterOutBlocks += 1;
327
    pCost->totalRows += pBlock->info.rows;
328
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
329 330
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
331 332 333
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
334
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
335
    pCost->skipBlocks += 1;
336
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
337
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
338
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
339
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
340
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
341
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
342
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
343
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
344
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
345
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
346
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
347 348
      return TSDB_CODE_SUCCESS;
    } else {
349
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
350
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
351
    }
H
Haojun Liao 已提交
352
  }
353

H
Haojun Liao 已提交
354
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
355

H
Haojun Liao 已提交
356
  // try to filter data block according to sma info
H
Haojun Liao 已提交
357
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
358 359 360
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
361
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
362
      if (!keep) {
X
Xiaoyu Wang 已提交
363 364
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
365 366 367
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

368
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
369 370
        return TSDB_CODE_SUCCESS;
      }
371
    }
H
Haojun Liao 已提交
372
  }
373

374 375 376
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

377
  // try to filter data block according to current results
378 379
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
380 381
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
382
    pCost->skipBlocks += 1;
383
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
384
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
385 386 387
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
388 389
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
390

H
Haojun Liao 已提交
391 392
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
393
    return terrno;
H
Haojun Liao 已提交
394 395
  }

H
Haojun Liao 已提交
396
  ASSERT(p == pBlock);
397
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
398

H
Haojun Liao 已提交
399 400
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
401

H
Haojun Liao 已提交
402
  if (pOperator->exprSupp.pFilterInfo != NULL) {
403
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
404
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
405

406 407
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
408

409 410
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
411
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
412 413 414 415
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
416 417
  }

418
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
419
  if (limitReached) {  // set operator flag is done
420 421
    setOperatorCompleted(pOperator);
  }
422

H
Haojun Liao 已提交
423
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
424 425 426
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
427
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
428 429 430
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
431
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
432 433
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
434 435
}

436 437
typedef struct STableCachedVal {
  const char* pName;
438
  STag*       pTags;
439 440
} STableCachedVal;

441 442 443 444 445 446 447 448 449 450 451
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
452 453
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
454
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
455 456 457 458
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
459
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
460 461 462 463 464 465 466
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

467 468
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
469

470 471 472 473 474
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
475
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
476 477 478
  }
}

479 480
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
481
  // currently only the tbname pseudo column
482
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
483
    return TSDB_CODE_SUCCESS;
484 485
  }

486 487
  int32_t code = 0;

488 489 490 491
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

492
  bool            freeReader = false;
493
  STableCachedVal val = {0};
494 495

  SMetaReader mr = {0};
496
  LRUHandle*  h = NULL;
497

498 499 500
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

501
  // 1. check if it is existed in meta cache
502
  if (pCache == NULL) {
503
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
504
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
505
    if (code != TSDB_CODE_SUCCESS) {
506
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
507
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
508 509
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
510 511 512

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
513
      } else {
S
slzhou 已提交
514 515
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
516
      }
517 518 519 520 521
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
522

523 524
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
525 526

    freeReader = true;
527
  } else {
528 529
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
530
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
531 532
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
533
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
534
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
535
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
536
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
537
                pBlock->info.id.uid, tstrerror(terrno), idStr);
538 539
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
540
        } else {
H
Haojun Liao 已提交
541
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
542
                 idStr);
H
Haojun Liao 已提交
543
        }
544 545 546 547 548 549
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
550
      STableCachedVal* pVal = createTableCacheVal(&mr);
551

H
Haojun Liao 已提交
552
      val = *pVal;
553
      freeReader = true;
H
Haojun Liao 已提交
554

H
Haojun Liao 已提交
555
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
556
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
557 558 559 560 561 562 563 564
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
565

H
Haojun Liao 已提交
566
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
567
    }
H
Haojun Liao 已提交
568

569 570
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
571
  }
572

573 574
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
575
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
576 577

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
578
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
579

580
    int32_t functionId = pExpr1->pExpr->_function.functionId;
581 582 583

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
584
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
585
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
586
      STagVal tagVal = {0};
587 588
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
589

590 591 592 593
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
594
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
595
      }
596

H
Haojun Liao 已提交
597 598
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
599
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
600
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
601
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
602 603 604
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
605 606 607 608 609 610
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
611
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
612
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
613
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
614
        }
615 616 617 618
      }
    }
  }

619 620
  // restore the rows
  pBlock->info.rows = backupRows;
621 622 623 624
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
625
  return TSDB_CODE_SUCCESS;
626 627
}

H
Haojun Liao 已提交
628
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
629 630 631
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
632
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
633
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
634 635 636
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
637

H
Haojun Liao 已提交
638
  colInfoDataEnsureCapacity(&infoData, 1, false);
639
  colDataSetVal(&infoData, 0, buf, false);
640

H
Haojun Liao 已提交
641
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
642
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
643 644 645 646 647 648 649

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
650
  colDataDestroy(&infoData);
651 652
}

653
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
654
  STableScanInfo* pTableScanInfo = pOperator->info;
655
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
656
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
657 658
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
659

660 661
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
662 663 664 665 666 667 668 669 670 671
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
672

673
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
674
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
675
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
676
    }
H
Haojun Liao 已提交
677

678
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
679
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
680 681 682
      break;
    }

683 684 685 686 687 688
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
689
    if (pBlock->info.id.uid) {
690
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
691
    }
692

693
    uint32_t status = 0;
694
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
695
    if (code != TSDB_CODE_SUCCESS) {
696
      T_LONG_JMP(pTaskInfo->env, code);
697
    }
698

699 700 701
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
702
    }
703

H
Haojun Liao 已提交
704 705
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
706

H
Haojun Liao 已提交
707
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
708 709

    // todo refactor
H
Haojun Liao 已提交
710
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
711
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
X
Xiaoyu Wang 已提交
712 713 714
    //    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    //    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
    //    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
715

716
    return pBlock;
H
Haojun Liao 已提交
717 718 719 720
  }
  return NULL;
}

H
Haojun Liao 已提交
721
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
722 723 724 725
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
726
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
727 728 729
    return NULL;
  }

730 731
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
732 733 734
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
735 736
    }

737
    pTableScanInfo->scanTimes += 1;
738

739
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
740
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
741 742
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
743
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
744

745
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
746
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
747
    }
748
  }
H
Haojun Liao 已提交
749

750
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
751
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
752 753 754
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
755
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
756
    }
H
Haojun Liao 已提交
757

758
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
759 760 761
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
762
      }
H
Haojun Liao 已提交
763

764
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
765

766
      if (pTableScanInfo->scanTimes < total) {
767
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
768
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
769

770
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
771
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
772
      }
H
Haojun Liao 已提交
773 774 775
    }
  }

wmmhello's avatar
wmmhello 已提交
776 777 778 779 780 781 782
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

783
  // scan table one by one sequentially
L
Liu Jicong 已提交
784
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
785
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
786
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
787

L
Liu Jicong 已提交
788
    while (1) {
H
Haojun Liao 已提交
789
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
790
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
791 792
        return result;
      }
H
Haojun Liao 已提交
793

L
Liu Jicong 已提交
794 795
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
796 797

      taosRLockLatch(&pTaskInfo->lock);
798
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
799

H
Haojun Liao 已提交
800
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
801
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
802
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
803 804
        return NULL;
      }
H
Haojun Liao 已提交
805

X
Xiaoyu Wang 已提交
806
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
807 808 809 810
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
811
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
812

H
Haojun Liao 已提交
813
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
814 815
      pInfo->scanTimes = 0;
    }
816 817
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
818
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
819
        setOperatorCompleted(pOperator);
820 821
        return NULL;
      }
822

5
54liuyao 已提交
823
      int32_t        num = 0;
824
      STableKeyInfo* pList = NULL;
825
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
826
      ASSERT(pInfo->base.dataReader == NULL);
827

L
Liu Jicong 已提交
828
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
829
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
830 831 832
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
833 834 835 836

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
837
    }
H
Haojun Liao 已提交
838

H
Haojun Liao 已提交
839
    SSDataBlock* result = doGroupedTableScan(pOperator);
840 841 842
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
843

844
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
845
      setOperatorCompleted(pOperator);
846 847
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
848

849 850
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
851
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
852

5
54liuyao 已提交
853
    int32_t        num = 0;
854
    STableKeyInfo* pList = NULL;
855
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
856

H
Haojun Liao 已提交
857 858
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
859
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
860

H
Haojun Liao 已提交
861
    result = doGroupedTableScan(pOperator);
862 863 864
    if (result != NULL) {
      return result;
    }
865

H
Haojun Liao 已提交
866
    setOperatorCompleted(pOperator);
867 868
    return NULL;
  }
H
Haojun Liao 已提交
869 870
}

871 872
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
873
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
874
  *pRecorder = pTableScanInfo->base.readRecorder;
875 876 877 878 879
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

880 881
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
882

883 884
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
885

886 887
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
888
  }
L
Liu Jicong 已提交
889

890
  tableListDestroy(pBase->pTableListInfo);
891 892 893 894 895 896 897 898
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
899
  taosMemoryFreeClear(param);
900 901
}

902
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
903
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
904
  int32_t         code = 0;
H
Haojun Liao 已提交
905 906 907
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
908
    code = TSDB_CODE_OUT_OF_MEMORY;
909
    goto _error;
H
Haojun Liao 已提交
910 911
  }

912
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
913
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
914 915

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
916
  code =
H
Haojun Liao 已提交
917
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
918 919 920 921
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
922
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
923
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
924
  if (code != TSDB_CODE_SUCCESS) {
925
    goto _error;
926 927
  }

H
Haojun Liao 已提交
928
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
929
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
930
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
931
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
932 933
  }

934
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
935
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
936

H
Haojun Liao 已提交
937 938
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
939 940
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

941 942
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
943

H
Haojun Liao 已提交
944
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
945
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
946
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
947

H
Haojun Liao 已提交
948 949 950
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
951 952
  }

wmmhello's avatar
wmmhello 已提交
953
  pInfo->currentGroupId = -1;
954
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
955
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
956

L
Liu Jicong 已提交
957 958
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
959
  pOperator->exprSupp.numOfExprs = numOfCols;
960

961
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
962 963
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
964 965 966
    code = terrno;
    goto _error;
  }
967

D
dapan1121 已提交
968 969 970 971
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
972
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
973 974
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
975 976 977

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
978
  return pOperator;
979

980
_error:
981 982 983
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
984

985 986
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
987
  return NULL;
H
Haojun Liao 已提交
988 989
}

990
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
991
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
992
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
993

H
Haojun Liao 已提交
994
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
995
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
996

L
Liu Jicong 已提交
997 998
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
999
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1000 1001 1002
  return pOperator;
}

1003
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1004
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1005 1006
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1007 1008
}

1009
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1010
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1011 1012
}

1013
static bool isStateWindow(SStreamScanInfo* pInfo) {
1014
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1015
}
5
54liuyao 已提交
1016

L
Liu Jicong 已提交
1017
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1018 1019 1020
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1021 1022 1023
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1024
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1025 1026
}

1027 1028 1029 1030
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1031
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1032 1033
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1034
  ASSERT(rowIndex < pBlock->info.rows);
1035
  pInfo->groupId = groupCol[rowIndex];
1036 1037
}

L
fix bug  
liuyao 已提交
1038
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
H
Haojun Liao 已提交
1039
  pTableScanInfo->base.cond.twindows = *pWin;
L
fix bug  
liuyao 已提交
1040
  pTableScanInfo->base.cond.endVersion = version;
L
Liu Jicong 已提交
1041 1042
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1043
  tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1044
  qDebug("1");
H
Haojun Liao 已提交
1045
  pTableScanInfo->base.dataReader = NULL;
1046 1047
}

L
Liu Jicong 已提交
1048 1049
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1050
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1051

1052
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1053
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1054 1055 1056 1057 1058 1059 1060 1061 1062

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1063
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1064
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1065 1066
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1067
    T_LONG_JMP(pTaskInfo->env, code);
1068 1069 1070
    return NULL;
  }

D
dapan1121 已提交
1071 1072
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
1073 1074
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1075
    T_LONG_JMP(pTaskInfo->env, code);
1076 1077 1078
    return NULL;
  }

D
dapan1121 已提交
1079
  if (hasNext) {
L
Liu Jicong 已提交
1080
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1081
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1082
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1083 1084 1085
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1086
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1087 1088
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1089 1090

  return pBlock->info.rows > 0 ? pBlock : NULL;
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1102
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1103
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1104
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1105 1106
}

5
54liuyao 已提交
1107 1108 1109 1110 1111 1112 1113 1114
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1115
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1116 1117 1118
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1129 1130 1131
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1132 1133 1134 1135 1136 1137

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1138
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1139 1140 1141 1142
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1143 1144 1145
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1146
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1147 1148 1149
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1150
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1151 1152 1153
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1154 1155
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1156 1157 1158
    break;
  }

L
fix bug  
liuyao 已提交
1159
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1160
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1161 1162 1163
  return true;
}

5
54liuyao 已提交
1164
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1165
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1166
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1167
  dumyInfo.cur.pageId = -1;
1168
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1169 1170
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1171
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1172

5
54liuyao 已提交
1173
  while (1) {
1174 1175 1176
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1177
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1178 1179 1180 1181 1182
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1183
    }
5
54liuyao 已提交
1184

5
54liuyao 已提交
1185 1186 1187
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1188
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1189
    endWin = preWin;
5
54liuyao 已提交
1190
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1191 1192 1193 1194 1195 1196
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1197

L
Liu Jicong 已提交
1198
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1199
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1200
  bool prepareRes = true;
L
Liu Jicong 已提交
1201 1202 1203
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1204 1205
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1206 1207 1208 1209
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1210 1211 1212
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1213 1214
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1215
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1216
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1217
      tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1218
      qDebug("2");
H
Haojun Liao 已提交
1219
      pTableScanInfo->base.dataReader = NULL;
1220 1221
      return NULL;
    }
L
Liu Jicong 已提交
1222

H
Haojun Liao 已提交
1223
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1224 1225 1226 1227
    if (pResult->info.rows == 0) {
      continue;
    }

1228 1229 1230 1231 1232 1233 1234 1235
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1236 1237
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1238
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1239 1240 1241 1242
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1243 1244 1245

      blockDataDestroy(tmpBlock);

1246 1247 1248 1249
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1250
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1251
      pResult->info.calWin = pInfo->updateWin;
1252
      return pResult;
5
54liuyao 已提交
1253 1254
    }
  }
1255
}
1256

1257
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1258
                                   SSessionKey* pKey) {
1259 1260 1261
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1262

1263 1264 1265 1266 1267
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1268 1269

  taosMemoryFree(pCur);
1270 1271 1272
  return code;
}

1273
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1274
  blockDataCleanup(pDestBlock);
1275 1276
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1277
  }
1278
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1279
  if (code != TSDB_CODE_SUCCESS) {
1280
    return code;
L
Liu Jicong 已提交
1281
  }
1282 1283
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1284
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1285
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1286
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1287 1288
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1289

1290 1291
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1292
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1293
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1294 1295
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1296
  int64_t          version = pSrcBlock->info.version - 1;
1297
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1298
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1299
    // gap must be 0.
5
54liuyao 已提交
1300
    SSessionKey startWin = {0};
1301
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1302
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1303 1304 1305
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1306 1307
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1308
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1309 1310 1311 1312
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1313
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1314 1315
      continue;
    }
1316 1317
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1318

1319
    colDataSetNULL(pDestUidCol, i);
1320
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1321 1322
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1323
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1324
  }
1325
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1326
}
1327 1328 1329 1330 1331 1332

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1333
  }
1334

1335 1336
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1337 1338
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1339

L
Liu Jicong 已提交
1340
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1341
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1378 1379
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1380
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1381 1382 1383
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1384
  for (int32_t i = 0; i < rows;) {
1385
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1386 1387 1388 1389 1390
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1391
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1392
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1393 1394
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1395 1396 1397 1398 1399
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1400
    pDestBlock->info.rows++;
5
54liuyao 已提交
1401
  }
1402 1403
  return TSDB_CODE_SUCCESS;
}
1404

1405
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1406 1407 1408
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1409 1410
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1411
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1412 1413 1414 1415
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1426
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1427 1428
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1429
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1430 1431 1432
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1433
    if (pInfo->tbnameCalSup.pExprInfo) {
1434 1435 1436
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1437 1438
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
dengyihao's avatar
dengyihao 已提交
1439
      streamFreeVal(parTbname);
L
Liu Jicong 已提交
1440 1441 1442
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1443 1444 1445 1446
  }
  return TSDB_CODE_SUCCESS;
}

1447 1448 1449 1450
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1451
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1452
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1453 1454
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1455
  }
1456
  pDestBlock->info.type = STREAM_CLEAR;
1457
  pDestBlock->info.version = pSrcBlock->info.version;
1458
  pDestBlock->info.dataLoad = 1;
1459 1460 1461 1462
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1463
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1464
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1465 1466
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1467
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1468
  } else {
5
54liuyao 已提交
1469 1470
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1471
  }
L
Liu Jicong 已提交
1472 1473
}

1474 1475
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1476 1477
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1478 1479
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1480 1481
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1482
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1483 1484 1485 1486 1487 1488 1489
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1490
  pBlock->info.rows++;
5
54liuyao 已提交
1491 1492
}

1493
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1494 1495
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1496
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1497
  }
1498 1499
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1500
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1501
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1502
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1503 1504
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1505
    bool        isClosed = false;
5
54liuyao 已提交
1506
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1507
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1508 1509 1510 1511 1512
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1513 1514 1515
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1516
    // must check update info first.
H
Haojun Liao 已提交
1517
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1518
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1519
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
L
liuyao 已提交
1520
                                           pInfo->pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1521
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1522
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1523
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1524
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1525
                                       NULL);
5
54liuyao 已提交
1526 1527
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1528 1529
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1530
      }
1531 1532
    }
  }
1533 1534
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1535
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1536
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1537
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1538 1539
  }
}
L
Liu Jicong 已提交
1540

1541
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1542 1543
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1544
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1545

1546 1547
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1548
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1549
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1550
  pInfo->pRes->info.type = STREAM_NORMAL;
1551
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1552

1553
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1554
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1555 1556

  // todo extract method
H
Haojun Liao 已提交
1557 1558 1559
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1560 1561 1562 1563 1564 1565 1566
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1567
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1568
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1569 1570 1571 1572 1573 1574 1575
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1576
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1577
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1578 1579 1580 1581 1582
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1583
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1584
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1585 1586
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1587
      blockDataFreeRes((SSDataBlock*)pBlock);
1588
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1589
    }
K
kailixu 已提交
1590 1591 1592

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1593 1594
  }

1595
  if (filter) {
H
Haojun Liao 已提交
1596
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1597
  }
1598

1599
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1600
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1601
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1602

L
Liu Jicong 已提交
1603
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1604 1605
  return 0;
}
5
54liuyao 已提交
1606

L
Liu Jicong 已提交
1607
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1608 1609
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1610
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1611

1612
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1613

L
Liu Jicong 已提交
1614
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1615
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1616
      SPackedData submit = pTaskInfo->streamInfo.submit;
1617
      if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
L
Liu Jicong 已提交
1618
        qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
1619
        return NULL;
L
Liu Jicong 已提交
1620 1621 1622 1623 1624 1625
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1626
    while (tqNextDataBlock(pInfo->tqReader)) {
L
Liu Jicong 已提交
1627 1628
      SSDataBlock block = {0};

1629
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1630 1631 1632 1633
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1634
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1635 1636 1637 1638 1639 1640

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1641 1642
    pInfo->tqReader->msg2 = (SPackedData){0};
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1643
    return NULL;
L
Liu Jicong 已提交
1644 1645
  }

1646
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1647 1648
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
X
Xiaoyu Wang 已提交
1649
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
dengyihao's avatar
dengyihao 已提交
1650
             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
X
Xiaoyu Wang 已提交
1651
             pInfo->tqReader->pWalReader->curVersion);
1652
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1653
      return pResult;
1654
    }
1655 1656 1657 1658 1659 1660
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
    if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
      return NULL;
1661
    }
wmmhello's avatar
wmmhello 已提交
1662
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1663 1664
  }

1665
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
1666 1667
    while (1) {
      SFetchRet ret = {0};
1668
      tqNextBlock(pInfo->tqReader, &ret);
X
Xiaoyu Wang 已提交
1669 1670 1671
      tqOffsetResetToLog(
          &pTaskInfo->streamInfo.currentOffset,
          pInfo->tqReader->pWalReader->curVersion - 1);  // curVersion move to next, so currentOffset = curVersion - 1
1672

L
Liu Jicong 已提交
1673
      if (ret.fetchType == FETCH_TYPE__DATA) {
X
Xiaoyu Wang 已提交
1674 1675
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, ret.data.info.rows,
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1676
        blockDataCleanup(pInfo->pRes);
1677
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1678
        if (pInfo->pRes->info.rows > 0) {
X
Xiaoyu Wang 已提交
1679 1680
          qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
                 pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1681 1682
          return pInfo->pRes;
        }
X
Xiaoyu Wang 已提交
1683
      } else if (ret.fetchType == FETCH_TYPE__NONE) {
wmmhello's avatar
wmmhello 已提交
1684
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1685 1686 1687
        return NULL;
      }
    }
L
Liu Jicong 已提交
1688
  } else {
1689
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1690
    return NULL;
H
Haojun Liao 已提交
1691
  }
L
Liu Jicong 已提交
1692 1693
}

L
Liu Jicong 已提交
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1712 1713 1714
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1715

1716 1717 1718
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1719 1720 1721
      j++;
    }
  }
L
Liu Jicong 已提交
1722
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1723 1724
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1725
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1726 1727 1728 1729

  return 0;
}

5
54liuyao 已提交
1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1742
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1743 1744 1745 1746
    }
  }
}

5
54liuyao 已提交
1747
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
L
liuyao 已提交
1748
  if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
L
fix bug  
liuyao 已提交
1749
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1750
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1762 1763 1764 1765
    }
  }
}

L
liuyao 已提交
1766 1767 1768 1769 1770 1771 1772 1773 1774
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
  *pBuff = taosMemoryCalloc(1, len);
  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
  return len;
}

// other properties are recovered from the execution plan
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
L
fix bug  
liuyao 已提交
1775
  if (!pBuff || len == 0) {
L
liuyao 已提交
1776 1777 1778
    return;
  }

1779 1780
  SUpdateInfo* pUpInfo = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1781 1782 1783 1784 1785
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1786 1787 1788 1789 1790
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1791
  qDebug("stream scan called");
H
Haojun Liao 已提交
1792

1793 1794
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1795
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1796
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1797
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1798 1799 1800 1801
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1802
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1803
    } else {
H
Haojun Liao 已提交
1804 1805 1806 1807
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1808
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1809
    }
L
Liu Jicong 已提交
1810

H
Haojun Liao 已提交
1811
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1812

H
Haojun Liao 已提交
1813
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1814
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1815

L
Liu Jicong 已提交
1816 1817
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1818
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1819 1820
  }

5
54liuyao 已提交
1821 1822
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1823 1824 1825 1826 1827
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1828 1829 1830 1831 1832 1833 1834

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1835 1836 1837 1838
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1839
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1840 1841
        return pInfo->pUpdateRes;
      } break;
1842 1843 1844 1845 1846 1847 1848 1849 1850
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1851 1852 1853 1854 1855 1856
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1857
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1858 1859 1860 1861 1862 1863
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1864 1865 1866 1867 1868 1869
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1870
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1871
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1872
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1873 1874 1875 1876 1877 1878
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1879
      }
5
54liuyao 已提交
1880 1881
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1882
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1883 1884
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1885
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1886 1887
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1888 1889
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1890
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1891
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1892

H
Haojun Liao 已提交
1893
    pTSInfo->base.dataReader = NULL;
1894

H
Haojun Liao 已提交
1895 1896
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1897

L
Liu Jicong 已提交
1898
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1899 1900 1901
    return NULL;
  }

5
54liuyao 已提交
1902
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1903
// TODO: refactor
L
Liu Jicong 已提交
1904
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1905
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1906
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1907
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1908
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1909 1910 1911
      return NULL;
    }

1912
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1913 1914
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1915
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1916
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1917
    }
1918

1919
    // TODO move into scan
5
54liuyao 已提交
1920 1921
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1922
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1923 1924 1925
    if (pInfo->pUpdateInfo) {
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
    }
1926
    blockDataUpdateTsWindow(pBlock, 0);
1927
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1928 1929 1930
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1931 1932 1933
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1934
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1935
        pInfo->updateResIndex = 0;
1936
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1937 1938 1939
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1940
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1941
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1942
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1943
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1944
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1945 1946
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1947
        }
5
54liuyao 已提交
1948 1949
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1950 1951 1952 1953 1954 1955
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1956
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1957
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1958
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1959
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1960 1961
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1962 1963 1964 1965 1966
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1967 1968 1969
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1970
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1971 1972 1973
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1974 1975 1976 1977
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1978
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1979
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1980 1981 1982 1983
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1984
        }
1985 1986 1987
      } break;
      default:
        break;
5
54liuyao 已提交
1988
    }
1989
    // printDataBlock(pBlock, "stream scan recv");
1990
    return pBlock;
L
Liu Jicong 已提交
1991
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1992
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1993 1994 1995
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1996
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1997 1998 1999
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2000 2001 2002
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2003
      } break;
2004
      case STREAM_SCAN_FROM_DELETE_DATA: {
2005 2006 2007 2008 2009 2010 2011
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2022
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2023 2024
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2025
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2026
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2027 2028
          return pSDB;
        }
2029
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2030 2031 2032 2033
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2034
    }
2035

2036
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2037
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2038 2039
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2040 2041
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2042
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2043
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2044
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2045
    }
5
54liuyao 已提交
2046

H
Haojun Liao 已提交
2047 2048
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2049
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2050

L
Liu Jicong 已提交
2051
  NEXT_SUBMIT_BLK:
2052
    while (1) {
L
Liu Jicong 已提交
2053
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2054
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2055
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2056
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2057
          qDebug("stream scan return empty, consume block %d", totBlockNum);
L
fix bug  
liuyao 已提交
2058
          void* buff = NULL;
L
liuyao 已提交
2059 2060 2061 2062
          // int32_t len = streamScanOperatorEncode(pInfo, &buff);
          // if (len > 0) {
          //   streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len);
          // }
L
fix bug  
liuyao 已提交
2063
          taosMemoryFreeClear(buff);
2064 2065
          return NULL;
        }
2066

L
Liu Jicong 已提交
2067 2068
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
2069
        if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2070 2071 2072 2073
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2074 2075
      }

2076 2077
      blockDataCleanup(pInfo->pRes);

2078
      while (tqNextDataBlock(pInfo->tqReader)) {
2079
        SSDataBlock block = {0};
2080

2081
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2082 2083 2084 2085 2086

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2087
        setBlockIntoRes(pInfo, &block, false);
2088

5
54liuyao 已提交
2089 2090 2091
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2092 2093
        }

5
54liuyao 已提交
2094
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2095
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2096
        pInfo->pRes->info.dataLoad = 1;
2097 2098 2099
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2100 2101 2102
          break;
        }
      }
2103
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2104
        break;
J
jiacy-jcy 已提交
2105
      } else {
2106
        continue;
5
54liuyao 已提交
2107
      }
H
Haojun Liao 已提交
2108 2109 2110 2111
    }

    // record the scan action.
    pInfo->numOfExec++;
2112
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2113
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2114

X
Xiaoyu Wang 已提交
2115
    qDebug("scan rows: %" PRId64, pBlockInfo->rows);
L
Liu Jicong 已提交
2116 2117 2118
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2119 2120 2121 2122 2123 2124

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2125 2126 2127
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2128 2129 2130
  }
}

H
Haojun Liao 已提交
2131
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2132 2133 2134
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2135 2136 2137
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2138 2139 2140 2141 2142 2143
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2144
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2145 2146
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2147
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2148
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2149
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2150
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2151

wmmhello's avatar
wmmhello 已提交
2152
  qDebug("tmqsnap doRawScan called");
2153
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2154 2155 2156 2157 2158
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2159
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2160 2161
      }
    }
X
Xiaoyu Wang 已提交
2162

D
dapan1121 已提交
2163
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2164
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2165
        tsdbReleaseDataBlock(pInfo->dataReader);
2166
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2167
      }
2168

H
Haojun Liao 已提交
2169 2170
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2171
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2172 2173
      }

H
Haojun Liao 已提交
2174
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2175
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2176 2177
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2178 2179

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2180
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2181
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2182
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2183
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2184
    } else {
2185
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2186
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2187
    }
2188
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2189
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2190
    return NULL;
2191
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2192 2193 2194 2195 2196 2197
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2198
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2199
      taosMemoryFreeClear(data);
2200 2201 2202
      return NULL;
    }

2203
    if (!sContext->queryMeta) {  // change to get data next poll request
2204 2205 2206
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2207
    } else {
2208
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2209 2210 2211 2212
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2213

wmmhello's avatar
wmmhello 已提交
2214
    return NULL;
2215
  }
L
Liu Jicong 已提交
2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2254 2255 2256
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2257
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2258 2259 2260
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2261
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2262 2263 2264
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2265 2266 2267
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2268
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2269 2270 2271 2272 2273
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2274 2275
  int32_t code = TSDB_CODE_SUCCESS;

2276
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2277
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2278
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2279 2280
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2281 2282
  }

2283
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2284 2285
  pInfo->vnode = pHandle->vnode;

2286
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2287 2288
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2289

2290
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2291
  return pOperator;
H
Haojun Liao 已提交
2292

L
Liu Jicong 已提交
2293
_end:
H
Haojun Liao 已提交
2294 2295 2296 2297
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2298 2299
}

2300
static void destroyStreamScanOperatorInfo(void* param) {
2301
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2302

2303
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2304
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2305
  }
2306

2307 2308 2309
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2310 2311
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2312
  }
C
Cary Xu 已提交
2313 2314
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2315
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2316
  }
C
Cary Xu 已提交
2317

L
Liu Jicong 已提交
2318
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2319
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2320

L
Liu Jicong 已提交
2321
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2322 2323 2324 2325
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2326
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2327
  blockDataDestroy(pStreamScan->pCreateTbRes);
2328 2329 2330 2331
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2332
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2333
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2334
  SArray*          pColIds = NULL;
2335 2336
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2337

H
Haojun Liao 已提交
2338
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2339
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2340
    tableListDestroy(pTableListInfo);
2341
    goto _error;
H
Haojun Liao 已提交
2342 2343
  }

2344
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2345
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2346

2347
  pInfo->pTagCond = pTagCond;
2348
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2349

2350
  int32_t numOfCols = 0;
2351 2352
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2353
  if (code != TSDB_CODE_SUCCESS) {
2354
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2355 2356
    goto _error;
  }
2357

H
Haojun Liao 已提交
2358
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2359
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2360
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2361
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2362 2363

    int16_t colId = id->colId;
2364
    taosArrayPush(pColIds, &colId);
2365
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2366
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2367
    }
H
Haojun Liao 已提交
2368 2369
  }

L
Liu Jicong 已提交
2370 2371 2372 2373
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2374
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2375 2376
      goto _error;
    }
2377

L
Liu Jicong 已提交
2378 2379 2380
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2381
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2382 2383 2384 2385
      goto _error;
    }
  }

2386 2387
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2388
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2389 2390
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2391
      tableListDestroy(pTableListInfo);
2392 2393 2394 2395
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2396
      tableListDestroy(pTableListInfo);
2397 2398 2399 2400
      goto _error;
    }
  }

L
Liu Jicong 已提交
2401
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2402
  if (pInfo->pBlockLists == NULL) {
2403
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2404
    tableListDestroy(pTableListInfo);
2405
    goto _error;
H
Haojun Liao 已提交
2406 2407
  }

5
54liuyao 已提交
2408
  if (pHandle->vnode) {
2409
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2410
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2411
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2412
      pTSInfo->base.cond.endVersion = pHandle->version;
2413
    }
L
Liu Jicong 已提交
2414

2415
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2416
    int32_t        num = 0;
2417
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2418

2419
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2420
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2421
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2422 2423
    }

L
Liu Jicong 已提交
2424 2425 2426 2427
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2428
    } else {
L
Liu Jicong 已提交
2429 2430
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2431 2432
    }

2433
    pInfo->pUpdateInfo = NULL;
2434
    pInfo->pTableScanOp = pTableScanOp;
2435 2436 2437
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2438

L
Liu Jicong 已提交
2439
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2440
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2441 2442
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2443

L
Liu Jicong 已提交
2444
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2445
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2446
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2447
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2448 2449 2450 2451
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2452

L
Liu Jicong 已提交
2453
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2454
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2455 2456
  } else {
    taosArrayDestroy(pColIds);
2457
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2458
    pColIds = NULL;
5
54liuyao 已提交
2459 2460
  }

2461 2462 2463 2464 2465
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2466 2467 2468 2469 2470
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2471
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2472
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2473
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2474
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2475
  pInfo->groupId = 0;
2476
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2477
  pInfo->pStreamScanOp = pOperator;
2478
  pInfo->deleteDataIndex = 0;
2479
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2480
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2481
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2482
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2483
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2484 2485
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2486
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2487
  pInfo->pState = NULL;
L
Liu Jicong 已提交
2488

L
fix bug  
liuyao 已提交
2489 2490 2491 2492 2493 2494 2495
  // for stream
  if (pTaskInfo->streamInfo.pState) {
    void*   buff = NULL;
    int32_t len = 0;
    streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
    streamScanOperatorDeocde(buff, len, pInfo);
  }
L
liuyao 已提交
2496

L
fix bug  
liuyao 已提交
2497
  setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
L
Liu Jicong 已提交
2498
                  pTaskInfo);
2499
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2500

2501
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2502 2503
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2504

H
Haojun Liao 已提交
2505
  return pOperator;
2506

L
Liu Jicong 已提交
2507
_error:
H
Haojun Liao 已提交
2508 2509 2510 2511 2512 2513 2514 2515
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2516 2517
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2518 2519
}

2520
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2521 2522 2523 2524
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2525 2526 2527
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2528
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2529
  SSDataBlock*  pRes = pInfo->pRes;
2530
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2531

2532
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2533
  if (size == 0) {
H
Haojun Liao 已提交
2534 2535 2536 2537
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2538 2539 2540
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2541
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2542

wmmhello's avatar
wmmhello 已提交
2543
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2544
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2545
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2546
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2547
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2548 2549
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2550
      metaReaderClear(&mr);
2551
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2552
    }
H
Haojun Liao 已提交
2553

2554
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2555 2556 2557 2558 2559
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2560
        colDataSetVal(pDst, count, str, false);
2561
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2562 2563
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2564
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2565

2566 2567 2568 2569
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2570 2571
          data = (char*)p;
        }
2572
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2573
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2574

2575 2576
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2577
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2578
        }
H
Haojun Liao 已提交
2579 2580 2581
      }
    }

2582
    count += 1;
wmmhello's avatar
wmmhello 已提交
2583
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2584
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2585 2586 2587
    }
  }

2588 2589
  metaReaderClear(&mr);

2590
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2591
  if (pOperator->status == OP_EXEC_DONE) {
2592
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2593 2594 2595
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2596
  pOperator->resultInfo.totalRows += count;
2597

2598
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2599 2600
}

2601
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2602 2603
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2604
  taosArrayDestroy(pInfo->matchInfo.pList);
2605
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2606
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2607 2608
}

S
slzhou 已提交
2609
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2610
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2611
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2612 2613 2614 2615 2616
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2617 2618 2619 2620
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2621
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2622 2623 2624
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2625

H
Haojun Liao 已提交
2626 2627
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2628 2629 2630
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2631

2632
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2633
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2634 2635
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2636

L
Liu Jicong 已提交
2637 2638
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2639
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2640 2641
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2642 2643
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2644 2645

  return pOperator;
2646

2647
_error:
H
Haojun Liao 已提交
2648 2649 2650 2651 2652
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2653

dengyihao's avatar
dengyihao 已提交
2654
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2655 2656 2657 2658 2659 2660
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2661
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2662

H
Haojun Liao 已提交
2663
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2664

L
Liu Jicong 已提交
2665
  int64_t      st = taosGetTimestampUs();
2666
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2667
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2668

D
dapan1121 已提交
2669
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2670
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
D
dapan1121 已提交
2671 2672 2673
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2674
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2675

D
dapan1121 已提交
2676
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2677
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2678
  bool         hasNext = false;
2679
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2692

H
Haojun Liao 已提交
2693
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2694
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2695
      pInfo->base.dataReader = NULL;
2696
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2697 2698 2699
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2700
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2701 2702 2703 2704
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2705
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2706 2707 2708 2709
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2710 2711

    uint32_t status = 0;
2712
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2713
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2714
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2715
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2716 2717 2718 2719 2720 2721 2722
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2723
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2724

H
Haojun Liao 已提交
2725
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2726
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2727

2728
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2729 2730 2731 2732
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2733
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2734 2735
    return pBlock;
  }
H
Haojun Liao 已提交
2736

D
dapan1121 已提交
2737 2738 2739 2740
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2741
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2742 2743 2744
  return NULL;
}

2745 2746 2747
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2748
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2749
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2750
      tsTargetSlotId = colInfo->dstSlotId;
2751 2752 2753
    }
  }

2754 2755 2756
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2757
  bi.slotId = tsTargetSlotId;
2758 2759 2760 2761 2762 2763 2764
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2765
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2766 2767 2768 2769 2770 2771 2772
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2773

2774
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2775 2776 2777
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2778
  {
2779
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2780
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2781
    for (; i < numOfTables; ++i) {
2782
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2783 2784 2785 2786 2787 2788
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2789

S
slzhou 已提交
2790 2791
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2792

H
Haojun Liao 已提交
2793
  pInfo->base.dataReader = NULL;
2794

2795 2796
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2797
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2798
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2799 2800
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2801

dengyihao's avatar
dengyihao 已提交
2802
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2803 2804 2805 2806 2807 2808

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2809 2810 2811
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2812
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2813
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2814 2815
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2816
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2817 2818

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2819
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2820
    taosArrayPush(pInfo->queryConds, &cond);
2821 2822
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2823
  for (int32_t i = 0; i < numOfTable; ++i) {
2824
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2825
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2826
    ps->param = param;
2827
    ps->onlyRef = true;
2828 2829 2830 2831 2832 2833
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2834
    T_LONG_JMP(pTaskInfo->env, terrno);
2835 2836
  }

2837 2838 2839 2840 2841 2842 2843
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2844
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2845

2846 2847 2848 2849 2850 2851 2852
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2853
  for (int32_t i = 0; i < numOfTable; ++i) {
2854 2855
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
D
dapan1121 已提交
2856 2857
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2858
  }
2859 2860
  taosArrayClear(pInfo->sortSourceParams);

2861
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2862
  pInfo->pSortHandle = NULL;
2863

dengyihao's avatar
opt mem  
dengyihao 已提交
2864 2865 2866
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2867
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2868 2869 2870
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2871
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2872 2873 2874
  return TSDB_CODE_SUCCESS;
}

2875 2876
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2877 2878
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2879 2880 2881
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2882
  blockDataCleanup(pResBlock);
2883 2884

  while (1) {
2885
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2886 2887 2888 2889
    if (pTupleHandle == NULL) {
      break;
    }

2890 2891
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2892 2893 2894 2895
      break;
    }
  }

2896
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2897
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2898
         pInfo->limitInfo.numOfOutputRows);
2899

2900
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2913
    T_LONG_JMP(pTaskInfo->env, code);
2914
  }
2915

2916
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2917 2918
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2919

S
slzhou 已提交
2920
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2921
      setOperatorCompleted(pOperator);
2922 2923
      return NULL;
    }
S
slzhou 已提交
2924
    pInfo->tableStartIndex = 0;
2925
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2926 2927
    startGroupTableMergeScan(pOperator);
  }
2928

S
slzhou 已提交
2929 2930
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2931 2932 2933 2934
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2935 2936
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2937
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2938
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2939 2940 2941
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2942
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2943 2944
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2945
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2946 2947
        break;
      }
2948

S
slzhou 已提交
2949
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2950
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2951
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2952
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2953
    }
wmmhello's avatar
wmmhello 已提交
2954 2955
  }

2956 2957 2958
  return pBlock;
}

2959
void destroyTableMergeScanOperatorInfo(void* param) {
2960
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2961
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2962

dengyihao's avatar
dengyihao 已提交
2963 2964 2965
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2966 2967
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
D
dapan1121 已提交
2968 2969
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2970
  }
H
Haojun Liao 已提交
2971

D
dapan1121 已提交
2972 2973 2974
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2975
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2976 2977
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2978

dengyihao's avatar
opt mem  
dengyihao 已提交
2979 2980 2981
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2982 2983
  }

2984 2985
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
2986 2987 2988 2989 2990

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2991
  taosMemoryFreeClear(param);
2992 2993 2994 2995
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2996 2997
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2998
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2999
  execInfo->blockRecorder = pInfo->base.readRecorder;
3000
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3001 3002 3003

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3004

3005 3006 3007
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3008
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3009
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3010 3011 3012 3013 3014
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3015

3016 3017 3018
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3019
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3020
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3021 3022 3023
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3024

H
Haojun Liao 已提交
3025
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3026
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3027
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3028 3029 3030 3031
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3032
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3033 3034
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3035 3036 3037 3038
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3039 3040 3041 3042 3043 3044
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3045 3046
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3047
  pInfo->base.readHandle = *readHandle;
3048 3049 3050

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3051
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3052

3053
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3054
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3055 3056 3057 3058 3059 3060

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3061
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3062
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3063 3064
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3065
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3066

H
Haojun Liao 已提交
3067
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3068
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3069
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3070

dengyihao's avatar
dengyihao 已提交
3071
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3072 3073
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3074

L
Liu Jicong 已提交
3075 3076
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3077
  pOperator->exprSupp.numOfExprs = numOfCols;
3078

3079 3080
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3081 3082 3083 3084 3085 3086 3087 3088 3089
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3090 3091 3092 3093

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3094
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3095 3096 3097 3098 3099 3100
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3101 3102
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3103 3104 3105 3106 3107 3108 3109
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3171 3172
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3201

S
slzhou 已提交
3202
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3203 3204 3205
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3206
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3207
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3208
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3209 3210 3211 3212 3213 3214 3215 3216 3217

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3218
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3219 3220
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3221 3222 3223
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3224 3225 3226

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3227 3228
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3240 3241 3242
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3243
    ASSERT(strlen(dbName));
S
slzhou 已提交
3244
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3245 3246 3247 3248

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3249
    varDataSetLen(varDbName, strlen(dbName));
3250
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3251 3252 3253 3254
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3255
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3256
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3257
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3258
      varDataSetLen(varStbName, strlen(stbName));
3259
      colDataSetVal(colInfoData, 0, varStbName, false);
3260
    } else {
3261
      colDataSetNULL(colInfoData, 0);
3262
    }
S
slzhou 已提交
3263 3264 3265
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3266
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3267
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3268 3269 3270 3271
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3272
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3273 3274 3275
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3276
  size_t infodbTableNum;
S
slzhou 已提交
3277
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3278
  size_t perfdbTableNum;
S
slzhou 已提交
3279 3280
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3281
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3282
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3283 3284
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3285
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3286 3287 3288 3289
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3306 3307 3308 3309 3310 3311
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3312

S
slzhou 已提交
3313 3314 3315
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3316 3317 3318 3319 3320 3321 3322
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3323 3324 3325 3326 3327 3328 3329 3330
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3331
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3332 3333 3334 3335
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3336
  blockDataCleanup(pRes);
3337

S
slzhou 已提交
3338 3339 3340
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3341
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3342
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3343
  }
S
slzhou 已提交
3344

S
slzhou 已提交
3345 3346 3347 3348 3349
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3350 3351
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3352
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3353

S
slzhou 已提交
3354 3355 3356 3357 3358 3359
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3360
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3375
      }
S
slzhou 已提交
3376 3377 3378 3379 3380 3381 3382 3383 3384 3385
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3386
    } else {
S
slzhou 已提交
3387
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3388 3389
    }
  } else {
S
slzhou 已提交
3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3407 3408 3409
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3410
    }
S
slzhou 已提交
3411 3412 3413
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3414
  }
S
slzhou 已提交
3415 3416 3417 3418 3419 3420
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3421 3422 3423
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3424

S
slzhou 已提交
3425 3426 3427
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3428 3429 3430
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3431 3432 3433 3434 3435 3436 3437 3438
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3439 3440 3441 3442 3443
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3444

S
slzhou 已提交
3445 3446 3447 3448 3449 3450 3451 3452
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3453 3454 3455
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3456
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3457 3458
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3459
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3460 3461
  taosMemoryFreeClear(param);
}