scanoperator.c 123.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
35
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
36

H
Haojun Liao 已提交
37 38 39 40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
49
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
50

H
Haojun Liao 已提交
51
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
52 53 54 55 56 57 58 59 60 61 62 63
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

64
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
65 66 67 68 69
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

70 71 72 73 74 75 76 77 78
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
79
  // convert key to second
80 81 82 83 84 85 86
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
87
  time_t    t = (time_t)key;
88 89 90 91 92
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
93
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
94 95 96 97

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
98
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
99 100 101 102

  tw->ekey -= 1;
}

103
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
104 105 106 107 108 109 110
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

111
  if (order == TSDB_ORDER_ASC) {
112
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
113
    ASSERT(w.ekey >= pBlockInfo->window.skey);
114

115
    if (w.ekey < pBlockInfo->window.ekey) {
116 117 118
      return true;
    }

119 120
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
121 122 123 124
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

125
      ASSERT(w.ekey > pBlockInfo->window.ekey);
126
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
127 128 129 130
        return true;
      }
    }
  } else {
131
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
132
    ASSERT(w.skey <= pBlockInfo->window.ekey);
133

134
    if (w.skey > pBlockInfo->window.skey) {
135 136 137
      return true;
    }

138
    while (1) {
139 140 141 142 143 144
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
145
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
146 147 148
        return true;
      }
    }
149 150 151 152 153
  }

  return false;
}

154 155 156 157 158 159 160 161 162 163 164
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
165 166
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
167 168 169 170 171

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
172
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
173 174 175
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
176

177 178 179 180 181 182
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
183
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
184 185 186
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
187
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
188 189

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
190
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
191 192 193 194 195 196 197 198 199

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
200
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
201 202 203 204 205 206 207 208 209

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
210
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
211 212 213 214 215 216 217 218

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
219
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
220
                               int32_t numOfRows) {
H
Haojun Liao 已提交
221
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
222 223 224
    return true;
  }

H
Haojun Liao 已提交
225
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
226 227 228
  return keep;
}

H
Haojun Liao 已提交
229
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
230
  bool    allColumnsHaveAgg = true;
231
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
232
  if (code != TSDB_CODE_SUCCESS) {
233
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
234 235 236 237 238 239 240 241
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
242
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
243
                               int32_t rows) {
H
Haojun Liao 已提交
244 245 246
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

247
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
248
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
249
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
250
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
251 252
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
253 254 255

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
256 257 258
  }
}

259
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
260
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
261
  const char* id = GET_TASKID(pTaskInfo);
262

263
  if (pLimitInfo->remainOffset > 0) {
264 265
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
266
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
267
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
268
      return false;
269 270 271 272 273 274 275 276
    } else {
      blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
277
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
278
    blockDataKeepFirstNRows(pBlock, keep);
279 280

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
281
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
282
    return true;
283
  }
284

285
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
286
  return false;
287 288
}

H
Haojun Liao 已提交
289
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
290
                             uint32_t* status) {
S
slzhou 已提交
291
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
292
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
293 294

  pCost->totalBlocks += 1;
295
  pCost->totalRows += pBlock->info.rows;
296

H
Haojun Liao 已提交
297
  bool loadSMA = false;
H
Haojun Liao 已提交
298
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
299
  if (pOperator->exprSupp.pFilterInfo != NULL ||
300
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
301 302 303 304
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
305
  taosMemoryFreeClear(pBlock->pBlockAgg);
306 307

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
308 309
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
310
    pCost->filterOutBlocks += 1;
311
    pCost->totalRows += pBlock->info.rows;
312 313
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
314 315
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
316
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
317 318
    pCost->skipBlocks += 1;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
319
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
320
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
321
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
322
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
323
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
324 325
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
326
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
327 328
      return TSDB_CODE_SUCCESS;
    } else {
329
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
330
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
331
    }
H
Haojun Liao 已提交
332
  }
333

H
Haojun Liao 已提交
334
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
335

H
Haojun Liao 已提交
336
  // try to filter data block according to sma info
H
Haojun Liao 已提交
337
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
338 339 340
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
341
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
342 343 344 345 346 347 348 349
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
350
    }
H
Haojun Liao 已提交
351
  }
352

353 354 355
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

356
  // try to filter data block according to current results
357 358
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
359
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
360 361 362
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

363
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
364 365 366
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
367 368
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
369

H
Haojun Liao 已提交
370 371
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
372
    return terrno;
H
Haojun Liao 已提交
373 374
  }

H
Haojun Liao 已提交
375
  ASSERT(p == pBlock);
376
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
377

H
Haojun Liao 已提交
378 379
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
380

H
Haojun Liao 已提交
381
  if (pOperator->exprSupp.pFilterInfo != NULL) {
382
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
383
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
384

385 386
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
387

388 389 390 391 392 393 394
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
395 396
  }

397
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
398 399 400
  if (limitReached) { // set operator flag is done
    setOperatorCompleted(pOperator);
  }
401

H
Haojun Liao 已提交
402
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
403 404 405
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
406
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
407 408 409
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
410
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
411 412
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
413 414
}

415 416
typedef struct STableCachedVal {
  const char* pName;
417
  STag*       pTags;
418 419
} STableCachedVal;

420 421 422 423 424 425 426 427 428 429 430
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
431 432 433 434 435 436 437
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
  pVal->pName = strdup(pMetaReader->me.name);
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
438
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
439 440 441 442 443 444 445
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

446 447
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
448

449 450
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
451
  // currently only the tbname pseudo column
452
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
453
    return TSDB_CODE_SUCCESS;
454 455
  }

456 457
  int32_t code = 0;

458 459 460 461
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

462
  bool            freeReader = false;
463
  STableCachedVal val = {0};
464 465

  SMetaReader mr = {0};
466
  LRUHandle*  h = NULL;
467

468
  // 1. check if it is existed in meta cache
469
  if (pCache == NULL) {
470
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
471
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
472
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
473
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
474 475
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
476
      } else {
S
slzhou 已提交
477 478
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
479
      }
480 481 482 483 484
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
485

486 487
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
488 489

    freeReader = true;
490
  } else {
491 492
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
493
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
494 495
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
496
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
497
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
498
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
499
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
500
                pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
501
        } else {
H
Haojun Liao 已提交
502
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
503
                 idStr);
H
Haojun Liao 已提交
504
        }
505 506 507 508 509 510
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
511
      STableCachedVal* pVal = createTableCacheVal(&mr);
512

H
Haojun Liao 已提交
513
      val = *pVal;
514
      freeReader = true;
H
Haojun Liao 已提交
515

H
Haojun Liao 已提交
516
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
517
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
518 519 520 521 522 523 524 525
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
526

H
Haojun Liao 已提交
527
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
528
    }
H
Haojun Liao 已提交
529

530 531
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
532
  }
533

534 535
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
536
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
537 538

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
539
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
540

541
    int32_t functionId = pExpr1->pExpr->_function.functionId;
542 543 544

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
545
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
546
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
547
      STagVal tagVal = {0};
548 549
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
550

551 552 553 554
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
555
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
556
      }
557

H
Haojun Liao 已提交
558 559 560
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
561
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
562
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
563 564 565
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
566
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
567
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
568
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
569
        }
570 571 572 573
      }
    }
  }

574 575
  // restore the rows
  pBlock->info.rows = backupRows;
576 577 578 579
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
580
  return TSDB_CODE_SUCCESS;
581 582
}

H
Haojun Liao 已提交
583
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
584 585 586
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
587
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
588
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
589 590 591
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
592

H
Haojun Liao 已提交
593 594
  colInfoDataEnsureCapacity(&infoData, 1, false);
  colDataAppend(&infoData, 0, buf, false);
595

H
Haojun Liao 已提交
596
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
597
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
598 599 600 601 602 603 604

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
605
  colDataDestroy(&infoData);
606 607
}

608
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
609
  STableScanInfo* pTableScanInfo = pOperator->info;
610
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
611
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
612

613 614
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
615
  while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
616
    if (isTaskKilled(pTaskInfo)) {
617
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
618
    }
H
Haojun Liao 已提交
619

620 621 622 623 624 625
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
626
    ASSERT(pBlock->info.id.uid != 0);
H
Haojun Liao 已提交
627
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
628

629
    uint32_t status = 0;
H
Haojun Liao 已提交
630
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
631 632
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
633
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
634
    }
635

636 637 638
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
639
    }
640

H
Haojun Liao 已提交
641 642
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
643

H
Haojun Liao 已提交
644
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
645 646

    // todo refactor
H
Haojun Liao 已提交
647
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
648 649
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
650
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
651
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
652

H
Haojun Liao 已提交
653
    ASSERT(pBlock->info.id.uid != 0);
654
    return pBlock;
H
Haojun Liao 已提交
655 656 657 658
  }
  return NULL;
}

H
Haojun Liao 已提交
659
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
660 661 662 663
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
664
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
665 666 667
    return NULL;
  }

668 669
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
670 671 672
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
673 674
    }

675
    pTableScanInfo->scanTimes += 1;
676

677
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
678
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
679
      pTableScanInfo->base.scanFlag = REPEAT_SCAN;
680
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
681

682
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
683
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
684
    }
685
  }
H
Haojun Liao 已提交
686

687
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
688
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
689 690 691
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
692
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
693
    }
H
Haojun Liao 已提交
694

695
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
696 697 698
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
699
      }
H
Haojun Liao 已提交
700

701
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
702

703
      if (pTableScanInfo->scanTimes < total) {
704
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
705
        pTableScanInfo->base.scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
706

707
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
708
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
709
      }
H
Haojun Liao 已提交
710 711 712
    }
  }

wmmhello's avatar
wmmhello 已提交
713 714 715 716 717 718 719
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

720
  // scan table one by one sequentially
L
Liu Jicong 已提交
721
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
722
    int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
723

L
Liu Jicong 已提交
724
    while (1) {
H
Haojun Liao 已提交
725
      SSDataBlock* result = doGroupedTableScan(pOperator);
L
Liu Jicong 已提交
726 727 728
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
729

L
Liu Jicong 已提交
730 731
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
732
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
733 734
        return NULL;
      }
H
Haojun Liao 已提交
735

H
Haojun Liao 已提交
736
      STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
H
Haojun Liao 已提交
737
      tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
L
Liu Jicong 已提交
738 739
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
740

H
Haojun Liao 已提交
741
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
742 743
      pInfo->scanTimes = 0;
    }
744 745
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
H
Haojun Liao 已提交
746
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
747
        setOperatorCompleted(pOperator);
748 749
        return NULL;
      }
750

5
54liuyao 已提交
751
      int32_t        num = 0;
752
      STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
753
      tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
754
      ASSERT(pInfo->base.dataReader == NULL);
755

L
Liu Jicong 已提交
756 757
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
758 759 760
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
wmmhello's avatar
wmmhello 已提交
761
    }
H
Haojun Liao 已提交
762

H
Haojun Liao 已提交
763
    SSDataBlock* result = doGroupedTableScan(pOperator);
764
    if (result != NULL) {
H
Haojun Liao 已提交
765
      ASSERT(result->info.id.uid != 0);
766 767
      return result;
    }
H
Haojun Liao 已提交
768

H
Haojun Liao 已提交
769
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
770
      setOperatorCompleted(pOperator);
771 772
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
773

774 775
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
776
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
777

5
54liuyao 已提交
778
    int32_t        num = 0;
779
    STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
780
    tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
781

H
Haojun Liao 已提交
782 783
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
784
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
785

H
Haojun Liao 已提交
786
    result = doGroupedTableScan(pOperator);
787 788 789
    if (result != NULL) {
      return result;
    }
790

H
Haojun Liao 已提交
791
    setOperatorCompleted(pOperator);
792 793
    return NULL;
  }
H
Haojun Liao 已提交
794 795
}

796 797
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
798
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
799
  *pRecorder = pTableScanInfo->base.readRecorder;
800 801 802 803 804
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

805
static void destroyTableScanOperatorInfo(void* param) {
806
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
807
  blockDataDestroy(pTableScanInfo->pResBlock);
H
Haojun Liao 已提交
808
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
H
Haojun Liao 已提交
809

H
Haojun Liao 已提交
810 811
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
812

H
Haojun Liao 已提交
813 814
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
815
  }
L
Liu Jicong 已提交
816

H
Haojun Liao 已提交
817 818
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
D
dapan1121 已提交
819
  taosMemoryFreeClear(param);
820 821
}

822
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
823
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
824 825 826
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
827
    goto _error;
H
Haojun Liao 已提交
828 829
  }

830
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
831
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
832 833

  int32_t numOfCols = 0;
834
  int32_t code =
H
Haojun Liao 已提交
835
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
836 837 838 839
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
840
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
841
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
842
  if (code != TSDB_CODE_SUCCESS) {
843
    goto _error;
844 845
  }

H
Haojun Liao 已提交
846
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
847
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
848
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
849
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
850 851
  }

852
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
H
Haojun Liao 已提交
853 854

  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
855 856
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
857 858
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

859 860
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
861

H
Haojun Liao 已提交
862
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
863
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
864
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
865

H
Haojun Liao 已提交
866 867 868
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
869 870
  }

wmmhello's avatar
wmmhello 已提交
871
  pInfo->currentGroupId = -1;
872
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
873
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
874

L
Liu Jicong 已提交
875 876
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
877
  pOperator->exprSupp.numOfExprs = numOfCols;
878

H
Haojun Liao 已提交
879 880
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
881 882 883
    code = terrno;
    goto _error;
  }
884

H
Haojun Liao 已提交
885
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
886 887
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
888 889 890

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
891
  return pOperator;
892

893
_error:
894 895 896
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
897

898 899
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
900
  return NULL;
H
Haojun Liao 已提交
901 902
}

903
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
904
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
905
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
906

H
Haojun Liao 已提交
907
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
908
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
909

L
Liu Jicong 已提交
910 911
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
912
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
913 914 915
  return pOperator;
}

916
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
917 918
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
919 920
}

921
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
922
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
923 924
}

925
static bool isStateWindow(SStreamScanInfo* pInfo) {
926
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
927
}
5
54liuyao 已提交
928

L
Liu Jicong 已提交
929
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
930 931 932
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
933 934 935
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
936
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
937 938
}

939 940 941 942
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

943
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
944 945
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
946
  ASSERT(rowIndex < pBlock->info.rows);
947
  pInfo->groupId = groupCol[rowIndex];
948 949
}

L
Liu Jicong 已提交
950
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
951
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
952 953
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
954 955
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
956 957
}

L
Liu Jicong 已提交
958 959
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
960
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
961

962
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
963
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
964 965 966 967 968 969 970 971 972

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
973 974
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
975 976
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
977
    T_LONG_JMP(pTaskInfo->env, code);
978 979 980
    return NULL;
  }

H
Haojun Liao 已提交
981
  if (tsdbNextDataBlock(pReader)) {
L
Liu Jicong 已提交
982
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
983
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
H
Haojun Liao 已提交
984
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
985 986 987 988
  }

  tsdbReaderClose(pReader);
  qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
989 990
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
991 992

  return pBlock->info.rows > 0 ? pBlock : NULL;
993 994 995 996 997 998 999 1000 1001 1002 1003
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1004
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
H
Haojun Liao 已提交
1005
  return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
1006 1007
}

5
54liuyao 已提交
1008 1009 1010 1011 1012 1013 1014 1015
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1016
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1017 1018 1019
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1030 1031 1032
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1033 1034 1035 1036 1037 1038

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1039
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1040 1041 1042 1043
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1044 1045 1046
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1047
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1048 1049 1050
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1051
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1052 1053 1054
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1055 1056
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1057 1058 1059 1060
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1061
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1062 1063 1064
  return true;
}

5
54liuyao 已提交
1065
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1066
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1067
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1068
  dumyInfo.cur.pageId = -1;
1069
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1070 1071
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1072
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1073

5
54liuyao 已提交
1074
  while (1) {
1075 1076 1077
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1078
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1079 1080 1081 1082 1083
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1084
    }
5
54liuyao 已提交
1085

5
54liuyao 已提交
1086 1087 1088
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1089
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1090
    endWin = preWin;
5
54liuyao 已提交
1091
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1092 1093 1094 1095 1096 1097
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1098

L
Liu Jicong 已提交
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1110
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1111
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1112 1113
      tsdbReaderClose(pTableScanInfo->base.dataReader);
      pTableScanInfo->base.dataReader = NULL;
1114 1115
      return NULL;
    }
L
Liu Jicong 已提交
1116

H
Haojun Liao 已提交
1117
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1118 1119 1120 1121
    if (pResult->info.rows == 0) {
      continue;
    }

1122 1123 1124 1125 1126 1127 1128 1129
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1130 1131
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1132 1133 1134 1135 1136
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1137 1138 1139

      blockDataDestroy(tmpBlock);

1140 1141 1142 1143
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1144
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1145
      pResult->info.calWin = pInfo->updateWin;
1146
      return pResult;
5
54liuyao 已提交
1147 1148
    }
  }
1149
}
1150

1151
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1152
  blockDataCleanup(pDestBlock);
1153 1154
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1155
  }
1156
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1157
  if (code != TSDB_CODE_SUCCESS) {
1158
    return code;
L
Liu Jicong 已提交
1159
  }
1160 1161
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1162
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1163
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1164
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1165 1166
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1167

1168 1169
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1170
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1171
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1172 1173
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1174
  int64_t          version = pSrcBlock->info.version - 1;
1175
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1176
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1177
    // gap must be 0.
5
54liuyao 已提交
1178
    SSessionKey startWin = {0};
1179
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1180
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1181 1182 1183
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1184 1185 1186 1187 1188 1189
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
    colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);

5
54liuyao 已提交
1190
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1191
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1192 1193
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1194
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1195
  }
1196
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1197
}
1198 1199 1200 1201 1202 1203

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1204
  }
1205

1206 1207
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1208 1209
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1210

L
Liu Jicong 已提交
1211
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1212
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1249 1250
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1251
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1252 1253 1254
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1255
  for (int32_t i = 0; i < rows;) {
1256
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1257 1258 1259 1260 1261
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1262
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1263
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1264 1265
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1266 1267
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1268 1269 1270 1271
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1272
  }
1273 1274
  return TSDB_CODE_SUCCESS;
}
1275

1276
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1277 1278 1279
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1280 1281
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1282
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1283 1284 1285 1286
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1297
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1298 1299
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1300
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1301 1302 1303
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1304
    if (pInfo->tbnameCalSup.pExprInfo) {
1305 1306 1307
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1308 1309
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1310
      tdbFree(parTbname);
L
Liu Jicong 已提交
1311 1312 1313
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1314 1315 1316 1317
  }
  return TSDB_CODE_SUCCESS;
}

1318 1319 1320 1321
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1322
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1323
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1324 1325
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1326
  }
1327
  pDestBlock->info.type = STREAM_CLEAR;
1328
  pDestBlock->info.version = pSrcBlock->info.version;
1329
  pDestBlock->info.dataLoad = 1;
1330 1331 1332 1333
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1334
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1335 1336
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1337 1338
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1339 1340

  void* tbname = NULL;
H
Haojun Liao 已提交
1341
  if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1342 1343 1344
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1345
  }
1346
  tdbFree(tbname);
L
Liu Jicong 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
1366
    memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1367
    int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
1368 1369
    memcpy(pBlock->info.parTbName, varDataVal(pData), len);
    /*pBlock->info.parTbName[len + 1] = 0;*/
L
Liu Jicong 已提交
1370 1371 1372 1373
  } else {
    pBlock->info.parTbName[0] = 0;
  }

H
Haojun Liao 已提交
1374 1375
  if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
    streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName);
L
Liu Jicong 已提交
1376 1377
  }

L
Liu Jicong 已提交
1378 1379 1380 1381
  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1382 1383
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1384 1385
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1386 1387
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1388 1389
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1390
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1391 1392
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1393 1394
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1395 1396
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1397
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1398
  pBlock->info.rows++;
5
54liuyao 已提交
1399 1400
}

1401
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1402 1403
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1404
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1405
  }
1406 1407
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1408
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1409
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1410
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1411 1412
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1413
    bool        isClosed = false;
5
54liuyao 已提交
1414
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1415
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1416 1417 1418
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1419
    // must check update info first.
H
Haojun Liao 已提交
1420
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1421
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1422
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1423
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1424
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1425
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1426
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1427
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1428
                                       NULL);
5
54liuyao 已提交
1429 1430
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1431 1432
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1433
      }
1434 1435
    }
  }
1436 1437
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1438
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1439
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1440
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1441 1442
  }
}
L
Liu Jicong 已提交
1443

1444
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1445 1446
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1447
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1448

1449 1450
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1451
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1452
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1453
  pInfo->pRes->info.type = STREAM_NORMAL;
1454
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1455

H
Haojun Liao 已提交
1456
  pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
L
Liu Jicong 已提交
1457 1458

  // todo extract method
H
Haojun Liao 已提交
1459 1460 1461
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1462 1463 1464 1465 1466 1467 1468
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1469
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1470
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1471 1472 1473 1474 1475 1476 1477
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1478
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
L
Liu Jicong 已提交
1479 1480 1481 1482 1483 1484
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1485
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1486
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1487 1488
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1489
      blockDataFreeRes((SSDataBlock*)pBlock);
1490
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1491
    }
K
kailixu 已提交
1492 1493 1494

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1495 1496
  }

1497
  if (filter) {
H
Haojun Liao 已提交
1498
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1499
  }
1500

1501
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1502
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1503
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1504

L
Liu Jicong 已提交
1505
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1506 1507
  return 0;
}
5
54liuyao 已提交
1508

L
Liu Jicong 已提交
1509
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1510 1511
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1512

L
Liu Jicong 已提交
1513
  qDebug("queue scan called");
L
Liu Jicong 已提交
1514 1515 1516 1517 1518 1519 1520 1521

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1522
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1539
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1540 1541 1542 1543 1544 1545 1546 1547

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1548
    return NULL;
L
Liu Jicong 已提交
1549 1550
  }

L
Liu Jicong 已提交
1551 1552 1553
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1554
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1555
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1556 1557
      return pResult;
    } else {
1558 1559
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1560 1561
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1562
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1563
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1564
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
1565
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1566 1567 1568 1569
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1570 1571
        return NULL;
      }
1572 1573 1574
    }
  }

L
Liu Jicong 已提交
1575 1576 1577 1578 1579 1580
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1581
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1582 1583 1584
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1585
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1586
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1587 1588 1589 1590
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1591 1592 1593
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1594 1595
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1596
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1597 1598 1599 1600
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1601
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1602
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1603 1604 1605
        return NULL;
      }
    }
L
Liu Jicong 已提交
1606
#if 0
1607
    } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1608
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1609 1610 1611 1612 1613 1614
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1615
#endif
L
Liu Jicong 已提交
1616 1617 1618
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1619
  }
L
Liu Jicong 已提交
1620 1621
}

L
Liu Jicong 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
L
Liu Jicong 已提交
1650
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1651 1652
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1653
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1654 1655 1656 1657

  return 0;
}

5
54liuyao 已提交
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  }
}

L
Liu Jicong 已提交
1675 1676 1677 1678 1679
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1680
  qDebug("stream scan called");
L
Liu Jicong 已提交
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1714
  }
L
Liu Jicong 已提交
1715
#endif
H
Haojun Liao 已提交
1716

1717 1718
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1719
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1720
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1721
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1722 1723 1724 1725
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1726
    } else {
H
Haojun Liao 已提交
1727 1728 1729 1730
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1731
    }
L
Liu Jicong 已提交
1732 1733

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1734 1735
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1736
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1737

L
Liu Jicong 已提交
1738 1739 1740
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
L
Liu Jicong 已提交
1741
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1742 1743 1744
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
L
Liu Jicong 已提交
1745 1746 1747 1748 1749
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
L
Liu Jicong 已提交
1750 1751
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
L
Liu Jicong 已提交
1752
      pInfo->blockRecoverContiCnt++;
L
Liu Jicong 已提交
1753
      calBlockTbName(pInfo, pBlock);
1754
      if (pInfo->pUpdateInfo) {
L
Liu Jicong 已提交
1755 1756
        TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
        pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
1757
      }
1758
      qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
L
Liu Jicong 已提交
1759
      printDataBlock(pBlock, "scan recover");
L
Liu Jicong 已提交
1760 1761 1762
      return pBlock;
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1763
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1764 1765
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
1766

H
Haojun Liao 已提交
1767 1768
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1769

L
Liu Jicong 已提交
1770
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1771 1772 1773
    return NULL;
  }

5
54liuyao 已提交
1774
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1775
// TODO: refactor
L
Liu Jicong 已提交
1776
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1777
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1778
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1779
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1780
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1781 1782 1783
      return NULL;
    }

1784
    int32_t      current = pInfo->validBlockIndex++;
1785
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
H
Haojun Liao 已提交
1786 1787
    if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1788
    }
1789
    // TODO move into scan
5
54liuyao 已提交
1790 1791
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1792
    pBlock->info.dataLoad = 1;
1793
    blockDataUpdateTsWindow(pBlock, 0);
1794
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1795 1796 1797
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1798 1799 1800
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1801 1802
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1803 1804 1805
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1806
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1807
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1808
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1809
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1810
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1811 1812
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1813
        }
5
54liuyao 已提交
1814 1815
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1816 1817 1818 1819 1820 1821
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1822
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1823
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1824
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1825
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1826 1827
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1828 1829 1830 1831 1832
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1833 1834 1835
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1836
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1837 1838 1839
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1840 1841 1842 1843
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1844
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1845
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1846 1847 1848 1849
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1850
        }
1851 1852 1853
      } break;
      default:
        break;
5
54liuyao 已提交
1854
    }
1855
    // printDataBlock(pBlock, "stream scan recv");
1856
    return pBlock;
L
Liu Jicong 已提交
1857
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1858
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1859 1860 1861 1862 1863 1864
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1865
      case STREAM_SCAN_FROM_DELETE_DATA: {
1866 1867 1868 1869 1870 1871 1872
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1883
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1884 1885
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1886 1887
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1888
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1889
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
1890 1891
          return pSDB;
        }
1892
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1893 1894 1895 1896
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1897
    }
1898

1899
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1900
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1901 1902
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1903 1904
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1905 1906
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1907
    }
5
54liuyao 已提交
1908

H
Haojun Liao 已提交
1909 1910
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1911
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1912

L
Liu Jicong 已提交
1913
  NEXT_SUBMIT_BLK:
1914 1915 1916
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1917
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1918
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1919
          qDebug("stream scan return empty, consume block %d", totBlockNum);
1920 1921
          return NULL;
        }
1922

1923 1924 1925 1926 1927 1928 1929 1930
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1931 1932
      }

1933 1934 1935 1936
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1937

1938 1939 1940 1941 1942 1943
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1944
        setBlockIntoRes(pInfo, &block, false);
1945

H
Haojun Liao 已提交
1946
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
1947
                             pInfo->pRes->info.version)) {
1948 1949 1950 1951 1952
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1969
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1970
        pInfo->pRes->info.dataLoad = 1;
1971 1972 1973
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1974 1975 1976
          break;
        }
      }
1977
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1978
        break;
J
jiacy-jcy 已提交
1979 1980
      } else {
        pInfo->tqReader->pMsg = NULL;
1981
        continue;
5
54liuyao 已提交
1982
      }
1983
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1984 1985 1986 1987
    }

    // record the scan action.
    pInfo->numOfExec++;
1988
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
1989
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
1990

L
Liu Jicong 已提交
1991
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
1992 1993 1994
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
1995 1996 1997 1998 1999 2000

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2001 2002 2003
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2004 2005 2006
  }
}

H
Haojun Liao 已提交
2007
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2008 2009 2010
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2011 2012 2013
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2014 2015 2016 2017 2018 2019
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2020
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2021 2022
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2023
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2024
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2025
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2026

wmmhello's avatar
wmmhello 已提交
2027
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2028
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
2029
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
2030
      if (isTaskKilled(pTaskInfo)) {
2031
        longjmp(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2032
      }
2033

H
Haojun Liao 已提交
2034 2035
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
wmmhello's avatar
wmmhello 已提交
2036
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2037 2038
      }

H
Haojun Liao 已提交
2039
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2040
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2041
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2042 2043 2044
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2045 2046

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2047
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2048 2049
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2050 2051
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2052
    } else {
wmmhello's avatar
wmmhello 已提交
2053 2054
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2055
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2056 2057
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2058
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2059
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2060
    return NULL;
L
Liu Jicong 已提交
2061 2062 2063 2064 2065 2066 2067
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2068
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2069
      taosMemoryFreeClear(data);
2070 2071 2072
      return NULL;
    }

L
Liu Jicong 已提交
2073
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2074 2075 2076 2077
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2078
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2079
    } else {
wmmhello's avatar
wmmhello 已提交
2080 2081 2082 2083 2084 2085 2086
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2087

wmmhello's avatar
wmmhello 已提交
2088
    return NULL;
2089
  }
L
Liu Jicong 已提交
2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2128 2129 2130
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2131
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2132 2133 2134 2135 2136 2137
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2138 2139 2140
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2141
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2142 2143 2144 2145 2146
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2147 2148
  int32_t code = TSDB_CODE_SUCCESS;

2149
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2150
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2151
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2152 2153
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2154 2155
  }

wmmhello's avatar
wmmhello 已提交
2156 2157
  pInfo->vnode = pHandle->vnode;

2158
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2159 2160
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2161

2162
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2163
  return pOperator;
H
Haojun Liao 已提交
2164

L
Liu Jicong 已提交
2165
_end:
H
Haojun Liao 已提交
2166 2167 2168 2169
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2170 2171
}

2172
static void destroyStreamScanOperatorInfo(void* param) {
2173 2174
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2175
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2176 2177 2178 2179
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2180 2181
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2182
  }
C
Cary Xu 已提交
2183 2184
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2185
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2186
  }
C
Cary Xu 已提交
2187

L
Liu Jicong 已提交
2188 2189
  cleanupExprSupp(&pStreamScan->tbnameCalSup);

L
Liu Jicong 已提交
2190
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2191 2192 2193 2194
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2195
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2196 2197 2198 2199
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2200
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2201
                                            SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2202
  SArray*          pColIds = NULL;
2203 2204
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2205

H
Haojun Liao 已提交
2206
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2207
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2208
    goto _error;
H
Haojun Liao 已提交
2209 2210
  }

2211
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2212
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2213

2214
  pInfo->pTagCond = pTagCond;
2215
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2216

2217
  int32_t numOfCols = 0;
2218 2219
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2220 2221 2222
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2223

H
Haojun Liao 已提交
2224
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2225
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2226
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2227
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2228 2229

    int16_t colId = id->colId;
2230
    taosArrayPush(pColIds, &colId);
2231
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2232
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2233
    }
H
Haojun Liao 已提交
2234 2235
  }

L
Liu Jicong 已提交
2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

H
Haojun Liao 已提交
2262 2263
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2264 2265
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2266 2267
  }

5
54liuyao 已提交
2268
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2269
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2270
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2271
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2272
      pTSInfo->base.cond.endVersion = pHandle->version;
2273
    }
L
Liu Jicong 已提交
2274

2275
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2276
    int32_t        num = 0;
H
Haojun Liao 已提交
2277
    tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
2278

2279
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2280
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2281
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2282 2283
      code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
                            &pTSInfo->base.dataReader, NULL);
dengyihao's avatar
dengyihao 已提交
2284 2285
      if (code != 0) {
        terrno = code;
H
Haojun Liao 已提交
2286
        destroyTableScanOperatorInfo(pTableScanOp);
2287
        goto _error;
L
Liu Jicong 已提交
2288
      }
L
Liu Jicong 已提交
2289 2290
    }

L
Liu Jicong 已提交
2291 2292 2293 2294
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2295
    } else {
L
Liu Jicong 已提交
2296 2297
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2298 2299
    }

2300
    pInfo->pUpdateInfo = NULL;
2301
    pInfo->pTableScanOp = pTableScanOp;
2302 2303 2304
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2305

L
Liu Jicong 已提交
2306 2307
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2308
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2309

L
Liu Jicong 已提交
2310
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2311
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
H
Haojun Liao 已提交
2312
    SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
2313
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2314 2315 2316 2317 2318
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2319
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2320 2321
  } else {
    taosArrayDestroy(pColIds);
H
Haojun Liao 已提交
2322
    pColIds = NULL;
5
54liuyao 已提交
2323 2324
  }

2325 2326 2327 2328 2329
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2330 2331 2332 2333 2334
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2335
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2336
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2337
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2338
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2339
  pInfo->groupId = 0;
2340
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2341
  pInfo->pStreamScanOp = pOperator;
2342
  pInfo->deleteDataIndex = 0;
2343
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2344
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2345
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2346
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2347
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2348

L
Liu Jicong 已提交
2349 2350
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2351
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2352

L
Liu Jicong 已提交
2353
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2354 2355
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2356

H
Haojun Liao 已提交
2357
  return pOperator;
2358

L
Liu Jicong 已提交
2359
_error:
H
Haojun Liao 已提交
2360 2361 2362 2363 2364 2365 2366 2367
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2368 2369
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2370 2371
}

2372
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2373 2374 2375 2376
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2377 2378 2379
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2380
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2381
  SSDataBlock*  pRes = pInfo->pRes;
2382
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2383

H
Haojun Liao 已提交
2384
  int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2385
  if (size == 0) {
H
Haojun Liao 已提交
2386 2387 2388 2389
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2390 2391 2392
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2393
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2394

wmmhello's avatar
wmmhello 已提交
2395
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
H
Haojun Liao 已提交
2396
    STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2397
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2398
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2399
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2400 2401
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2402
      metaReaderClear(&mr);
2403
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2404
    }
H
Haojun Liao 已提交
2405

2406
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2407 2408 2409 2410 2411 2412
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
2413
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2414 2415
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2416
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2417

2418 2419 2420 2421
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2422 2423
          data = (char*)p;
        }
L
Liu Jicong 已提交
2424 2425
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2426

2427 2428
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2429
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2430
        }
H
Haojun Liao 已提交
2431 2432 2433
      }
    }

2434
    count += 1;
wmmhello's avatar
wmmhello 已提交
2435
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2436
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2437 2438 2439
    }
  }

2440 2441
  metaReaderClear(&mr);

2442
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2443
  if (pOperator->status == OP_EXEC_DONE) {
2444
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2445 2446 2447
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2448
  pOperator->resultInfo.totalRows += count;
2449

2450
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2451 2452
}

2453
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2454 2455
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2456
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2457
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2458 2459
}

S
slzhou 已提交
2460 2461
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         SExecTaskInfo* pTaskInfo) {
2462
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2463 2464 2465 2466 2467
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2468 2469 2470 2471
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2472
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2473 2474 2475
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2476

H
Haojun Liao 已提交
2477 2478
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2479 2480 2481
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2482

H
Haojun Liao 已提交
2483
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2484 2485
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2486

L
Liu Jicong 已提交
2487 2488
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2489
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2490 2491
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2492 2493
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2494 2495

  return pOperator;
2496

2497
_error:
H
Haojun Liao 已提交
2498 2499 2500 2501 2502
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2503

dengyihao's avatar
dengyihao 已提交
2504
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2505 2506 2507 2508 2509 2510 2511
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;

H
Haojun Liao 已提交
2512
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2513

L
Liu Jicong 已提交
2514 2515
  int64_t      st = taosGetTimestampUs();
  void*        p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2516
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2517

L
Liu Jicong 已提交
2518 2519
  int32_t code =
      tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
2520
  if (code != 0) {
H
Haojun Liao 已提交
2521
    T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
dengyihao 已提交
2522
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2523

H
Haojun Liao 已提交
2524
  STsdbReader* reader = pInfo->base.dataReader;
dengyihao's avatar
opt mem  
dengyihao 已提交
2525
  while (tsdbNextDataBlock(reader)) {
H
Haojun Liao 已提交
2526
    if (isTaskKilled(pTaskInfo)) {
2527
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2528 2529 2530
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2531
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2532 2533 2534 2535
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2536
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2537 2538 2539 2540
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2541 2542

    uint32_t status = 0;
H
Haojun Liao 已提交
2543
    loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2544
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2545
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2546
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2547 2548 2549 2550 2551 2552 2553
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

H
Haojun Liao 已提交
2554
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2555

H
Haojun Liao 已提交
2556
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2557
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2558

H
Haojun Liao 已提交
2559 2560
    tsdbReaderClose(pInfo->base.dataReader);
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2561 2562
    return pBlock;
  }
H
Haojun Liao 已提交
2563

H
Haojun Liao 已提交
2564 2565
  tsdbReaderClose(pInfo->base.dataReader);
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2566 2567 2568
  return NULL;
}

2569 2570 2571
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2572
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2573
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2574
      tsTargetSlotId = colInfo->dstSlotId;
2575 2576 2577
    }
  }

2578 2579 2580
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2581
  bi.slotId = tsTargetSlotId;
2582 2583 2584 2585 2586 2587 2588
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2589
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2590 2591 2592 2593 2594 2595 2596
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2597

2598
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2599 2600 2601
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2602
  {
H
Haojun Liao 已提交
2603
    size_t  numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2604
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2605
    for (; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2606
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
S
slzhou 已提交
2607 2608 2609 2610 2611 2612
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2613

S
slzhou 已提交
2614 2615
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2616

H
Haojun Liao 已提交
2617
  pInfo->base.dataReader = NULL;
2618

2619 2620
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2621
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2622
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2623 2624
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2625

dengyihao's avatar
dengyihao 已提交
2626
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2627 2628 2629 2630 2631 2632

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2633 2634 2635 2636
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2637 2638
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2639
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2640 2641

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2642
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2643
    taosArrayPush(pInfo->queryConds, &cond);
2644 2645
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2646
  for (int32_t i = 0; i < numOfTable; ++i) {
2647
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2648
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2649
    ps->param = param;
2650
    ps->onlyRef = true;
2651 2652 2653 2654 2655 2656
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2657
    T_LONG_JMP(pTaskInfo->env, terrno);
2658 2659
  }

2660 2661 2662 2663 2664 2665 2666
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2667
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2668

2669 2670 2671 2672 2673 2674 2675
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2676
  for (int32_t i = 0; i < numOfTable; ++i) {
2677 2678 2679
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
2680 2681
  taosArrayClear(pInfo->sortSourceParams);

2682
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2683
  pInfo->pSortHandle = NULL;
2684

dengyihao's avatar
opt mem  
dengyihao 已提交
2685 2686 2687
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2688
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2689 2690 2691
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2692
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2693 2694 2695
  return TSDB_CODE_SUCCESS;
}

2696 2697
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2698 2699
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2700 2701 2702
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2703
  blockDataCleanup(pResBlock);
2704 2705

  while (1) {
2706
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2707 2708 2709 2710
    if (pTupleHandle == NULL) {
      break;
    }

2711 2712
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2713 2714 2715 2716
      break;
    }
  }

2717
  applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
2718 2719 2720
  qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
      pInfo->limitInfo.numOfOutputRows);

2721
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2734
    T_LONG_JMP(pTaskInfo->env, code);
2735
  }
2736

H
Haojun Liao 已提交
2737
  size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2738 2739
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2740

S
slzhou 已提交
2741
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2742
      setOperatorCompleted(pOperator);
2743 2744
      return NULL;
    }
S
slzhou 已提交
2745
    pInfo->tableStartIndex = 0;
H
Haojun Liao 已提交
2746
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
2747 2748
    startGroupTableMergeScan(pOperator);
  }
2749

S
slzhou 已提交
2750 2751
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
2752 2753
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2754
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2755
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2756 2757 2758
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2759
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2760 2761
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2762
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2763 2764
        break;
      }
2765

S
slzhou 已提交
2766
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
H
Haojun Liao 已提交
2767
      pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2768 2769
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2770 2771
  }

2772 2773 2774
  return pBlock;
}

2775
void destroyTableMergeScanOperatorInfo(void* param) {
2776
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2777
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2778

dengyihao's avatar
dengyihao 已提交
2779 2780 2781
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2782 2783
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2784
  }
H
Haojun Liao 已提交
2785

2786
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2787 2788
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2789

H
Haojun Liao 已提交
2790 2791
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2792

dengyihao's avatar
opt mem  
dengyihao 已提交
2793 2794 2795
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2796
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2797
  taosArrayDestroy(pTableScanInfo->queryConds);
2798

H
Haojun Liao 已提交
2799 2800
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
2801 2802 2803 2804 2805 2806
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
H
Haojun Liao 已提交
2807
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
L
Liu Jicong 已提交
2808

H
Haojun Liao 已提交
2809 2810 2811 2812
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);

D
dapan1121 已提交
2813
  taosMemoryFreeClear(param);
2814 2815 2816 2817
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2818 2819
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2820
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2821
  execInfo->blockRecorder = pInfo->base.readRecorder;
2822
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2823 2824 2825

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2826

2827 2828 2829
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2830 2831
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo) {
2832 2833 2834 2835 2836
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2837

2838 2839 2840
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
2841
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
2842
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
2843 2844 2845
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2846

H
Haojun Liao 已提交
2847
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
2848
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2849
    taosArrayDestroy(pInfo->base.matchInfo.pList);
2850 2851 2852 2853
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
2854
    SExprSupp* pSup = &pInfo->base.pseudoSup;
2855 2856
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
2857 2858 2859 2860
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
2861 2862 2863 2864 2865 2866
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
2867 2868
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
2869
  pInfo->base.readHandle = *readHandle;
2870 2871 2872

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
H
Haojun Liao 已提交
2873

2874
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2875
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
2876 2877 2878 2879 2880 2881

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2882
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
2883
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
2884 2885
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

2886
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2887

H
Haojun Liao 已提交
2888
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
2889
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2890
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
2891

dengyihao's avatar
dengyihao 已提交
2892
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
2893 2894
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
2895

L
Liu Jicong 已提交
2896 2897
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2898
  pOperator->exprSupp.numOfExprs = numOfCols;
2899

2900 2901
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
2902 2903 2904 2905 2906 2907 2908 2909 2910
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
2911 2912 2913 2914

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
2915
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
2916 2917 2918 2919 2920 2921
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
2922 2923
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
2924 2925 2926 2927 2928 2929 2930
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
S
slzhou 已提交
2992 2993
    strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3022

S
slzhou 已提交
3023
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3024 3025 3026
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3027
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3028
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3029
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3030 3031 3032 3033 3034 3035 3036 3037 3038

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3039
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3040 3041
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3042 3043 3044
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3045 3046 3047

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3048 3049
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3061 3062 3063
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3064
    ASSERT(strlen(dbName));
S
slzhou 已提交
3065
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3066 3067 3068 3069

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3070 3071 3072 3073 3074 3075
    varDataSetLen(varDbName, strlen(dbName));
    colDataAppend(colInfoData, 0, varDbName, false);
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3076
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3077
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3078
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3079 3080 3081 3082 3083
      varDataSetLen(varStbName, strlen(stbName));
      colDataAppend(colInfoData, 0, varStbName, false);
    } else {
      colDataAppendNULL(colInfoData, 0);
    }
S
slzhou 已提交
3084 3085 3086
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3087
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
S
slzhou 已提交
3088 3089 3090 3091 3092
    colDataAppend(colInfoData, 0, (char*)&count, false);
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3093
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3094 3095 3096
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3097
  size_t infodbTableNum;
S
slzhou 已提交
3098
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3099
  size_t perfdbTableNum;
S
slzhou 已提交
3100 3101 3102
  getPerfDbMeta(NULL, &perfdbTableNum);

  if (pSupp->groupByDbName) {
S
slzhou 已提交
3103
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3104 3105
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3106
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3107 3108 3109 3110
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
    uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
    uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3140
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3141 3142 3143 3144
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3145
  blockDataCleanup(pRes);
3146

S
slzhou 已提交
3147 3148 3149
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3150
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3151
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3152
  }
S
slzhou 已提交
3153

S
slzhou 已提交
3154 3155 3156 3157 3158
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3159 3160
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3161
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3162

S
slzhou 已提交
3163 3164 3165 3166 3167 3168
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

S
slzhou 已提交
3169
  if (pSupp->groupByDbName) {
S
slzhou 已提交
3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3184
      }
S
slzhou 已提交
3185 3186 3187 3188 3189 3190 3191 3192 3193 3194
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3195
    } else {
S
slzhou 已提交
3196
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3197 3198
    }
  } else {
S
slzhou 已提交
3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3216 3217 3218
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3219
    }
S
slzhou 已提交
3220 3221 3222
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3223
  }
S
slzhou 已提交
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
  snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
  fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
  snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3252 3253 3254
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3255
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3256 3257
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3258
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3259 3260
  taosMemoryFreeClear(param);
}