scanoperator.c 120.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
35
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
36

H
Haojun Liao 已提交
37 38 39 40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
49
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
50

H
Haojun Liao 已提交
51
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
52 53 54 55 56 57 58 59 60 61 62 63
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

64
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
65 66 67 68 69
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

70 71 72 73 74 75 76 77 78
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
79
  // convert key to second
80 81 82 83 84 85 86
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
87
  time_t    t = (time_t)key;
88 89 90 91 92
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
93
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
94 95 96 97

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
98
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
99 100 101 102

  tw->ekey -= 1;
}

103
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
104 105 106 107 108 109 110
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

111
  if (order == TSDB_ORDER_ASC) {
112
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
113 114
    assert(w.ekey >= pBlockInfo->window.skey);

S
slzhou 已提交
115
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
116 117 118
      return true;
    }

119 120
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
121 122 123 124 125
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
126
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
127 128 129 130
        return true;
      }
    }
  } else {
131
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
132 133
    assert(w.skey <= pBlockInfo->window.ekey);

134
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
135 136 137
      return true;
    }

138
    while (1) {
139 140 141 142 143 144
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
145
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
146 147 148
        return true;
      }
    }
149 150 151 152 153
  }

  return false;
}

154 155 156 157 158 159 160 161 162 163 164
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
165 166
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
167 168 169 170 171

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
172
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
173 174 175 176 177 178
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
179
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
180 181 182
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
183
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
184 185

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
186
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
187 188 189 190 191 192 193 194 195

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
196
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
197 198 199 200 201 202 203 204 205

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
206
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
207 208 209 210 211 212 213 214

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
215
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
216
                               int32_t numOfRows) {
H
Haojun Liao 已提交
217
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
218 219 220
    return true;
  }

H
Haojun Liao 已提交
221
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
222 223 224
  return keep;
}

H
Haojun Liao 已提交
225
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
226 227 228 229 230
  bool             allColumnsHaveAgg = true;
  SColumnDataAgg** pColAgg = NULL;

  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
  if (code != TSDB_CODE_SUCCESS) {
231
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244
  }

  if (!allColumnsHaveAgg) {
    return false;
  }

  //  if (allColumnsHaveAgg == true) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

  // todo create this buffer during creating operator
  if (pBlock->pBlockAgg == NULL) {
    pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
    if (pBlock->pBlockAgg == NULL) {
245
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
H
Haojun Liao 已提交
246 247 248
    }
  }

249 250
  size_t num = taosArrayGetSize(pTableScanInfo->matchInfo.pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
251 252
    SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
H
Haojun Liao 已提交
253 254
      continue;
    }
H
Haojun Liao 已提交
255 256

    pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
H
Haojun Liao 已提交
257 258 259 260 261
  }

  return true;
}

H
Haojun Liao 已提交
262
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
263
                               int32_t rows) {
H
Haojun Liao 已提交
264 265 266
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

267
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
268
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
269
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
270
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
271 272
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
273 274 275

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
276 277 278
  }
}

279 280
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
281
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
282
  const char* id = GET_TASKID(pTaskInfo);
283 284 285 286 287

  if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
      pBlock->info.rows = 0;
H
Haojun Liao 已提交
288
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
289 290 291 292 293 294 295 296 297 298 299 300
    } else {
      blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
    int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
    int32_t keep = pBlock->info.rows - overflowRows;

    blockDataKeepFirstNRows(pBlock, keep);
H
Haojun Liao 已提交
301
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
302 303 304 305
    pOperator->status = OP_EXEC_DONE;
  }
}

H
Haojun Liao 已提交
306
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
307
                             uint32_t* status) {
S
slzhou 已提交
308
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
309
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
310 311

  pCost->totalBlocks += 1;
312
  pCost->totalRows += pBlock->info.rows;
313

H
Haojun Liao 已提交
314
  bool loadSMA = false;
H
Haojun Liao 已提交
315
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
316
  if (pOperator->exprSupp.pFilterInfo != NULL ||
317
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
318 319 320 321
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
322
  taosMemoryFreeClear(pBlock->pBlockAgg);
323 324

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
325 326
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
327
    pCost->filterOutBlocks += 1;
328
    pCost->totalRows += pBlock->info.rows;
329 330
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
331 332
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
333
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
334 335
    pCost->skipBlocks += 1;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
336
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
337
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
338
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
339
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
340
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
341 342
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
343
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
344 345
      return TSDB_CODE_SUCCESS;
    } else {
346
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
347
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
348
    }
H
Haojun Liao 已提交
349
  }
350

H
Haojun Liao 已提交
351
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
352

H
Haojun Liao 已提交
353
  // try to filter data block according to sma info
H
Haojun Liao 已提交
354
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
355 356 357
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
358
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
359 360 361 362 363 364 365 366
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
367
    }
H
Haojun Liao 已提交
368
  }
369

370 371 372
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

373
  // try to filter data block according to current results
374 375
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
376
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
377 378 379
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

380
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
381 382 383
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
384 385
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
386

H
Haojun Liao 已提交
387 388 389
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
390 391
  }

H
Haojun Liao 已提交
392
  relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
393
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
394

H
Haojun Liao 已提交
395 396
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
397

H
Haojun Liao 已提交
398
  if (pOperator->exprSupp.pFilterInfo != NULL) {
399
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
400
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
401

402 403
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
404

405 406 407 408 409 410 411
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
412 413
  }

H
Haojun Liao 已提交
414
  applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
415

H
Haojun Liao 已提交
416
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
417
  pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
H
Haojun Liao 已提交
418 419 420
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
421
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
422 423 424
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
425
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
426 427
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
428 429
}

430 431
typedef struct STableCachedVal {
  const char* pName;
432
  STag*       pTags;
433 434
} STableCachedVal;

435 436 437 438 439 440 441 442 443 444 445
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
446 447 448 449 450 451 452
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
  pVal->pName = strdup(pMetaReader->me.name);
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
453
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
454 455 456 457 458 459 460
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

461 462
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
463

464 465
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
466
  // currently only the tbname pseudo column
467
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
468
    return TSDB_CODE_SUCCESS;
469 470
  }

471 472
  int32_t code = 0;

473 474 475 476
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

477
  bool            freeReader = false;
478
  STableCachedVal val = {0};
479 480

  SMetaReader mr = {0};
481
  LRUHandle*  h = NULL;
482

483
  // 1. check if it is existed in meta cache
484
  if (pCache == NULL) {
485
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
486
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
487
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
488
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
489
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid,
490
              tstrerror(terrno), idStr);
H
Haojun Liao 已提交
491
      } else {
H
Haojun Liao 已提交
492
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
493
      }
494 495 496 497 498
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
499

500 501
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
502 503

    freeReader = true;
504
  } else {
505 506
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
507
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
508 509
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
510
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
511
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
512
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
513
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
514
                pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
515
        } else {
H
Haojun Liao 已提交
516
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
517
                 idStr);
H
Haojun Liao 已提交
518
        }
519 520 521 522 523 524
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
525
      STableCachedVal* pVal = createTableCacheVal(&mr);
526

H
Haojun Liao 已提交
527
      val = *pVal;
528
      freeReader = true;
H
Haojun Liao 已提交
529

H
Haojun Liao 已提交
530
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
531
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
532 533 534 535 536 537 538 539
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
540

H
Haojun Liao 已提交
541
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
542
    }
H
Haojun Liao 已提交
543

544 545
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
546
  }
547

548 549
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
550
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
551 552

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
553
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
554

555
    int32_t functionId = pExpr1->pExpr->_function.functionId;
556 557 558

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
559
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
560
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
561
      STagVal tagVal = {0};
562 563
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
564

565 566 567 568
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
569
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
570
      }
571

H
Haojun Liao 已提交
572 573 574
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
575
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
576
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
577 578 579
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
580
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
581
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
582
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
583
        }
584 585 586 587
      }
    }
  }

588 589
  // restore the rows
  pBlock->info.rows = backupRows;
590 591 592 593
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
594
  return TSDB_CODE_SUCCESS;
595 596
}

H
Haojun Liao 已提交
597
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
598 599 600
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
601
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
602
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
603 604 605
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
606

H
Haojun Liao 已提交
607 608
  colInfoDataEnsureCapacity(&infoData, 1, false);
  colDataAppend(&infoData, 0, buf, false);
609

H
Haojun Liao 已提交
610
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
611
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
612 613 614 615 616 617 618

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
619
  colDataDestroy(&infoData);
620 621
}

622
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
623
  STableScanInfo* pTableScanInfo = pOperator->info;
624
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
625
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
626

627 628
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
629
  while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
630
    if (isTaskKilled(pTaskInfo)) {
631
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
632
    }
H
Haojun Liao 已提交
633

634 635 636 637 638 639
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

640
    blockDataCleanup(pBlock);
H
Haojun Liao 已提交
641
    SDataBlockInfo* pBInfo = &pBlock->info;
H
Haojun Liao 已提交
642 643

    int32_t rows = 0;
H
Haojun Liao 已提交
644
    tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->id.uid, &pBInfo->window);
H
Haojun Liao 已提交
645

646
    blockDataEnsureCapacity(pBlock, rows);  // todo remove it latter
H
Haojun Liao 已提交
647
    pBInfo->rows = rows;
648

H
Haojun Liao 已提交
649 650
    ASSERT(pBInfo->id.uid != 0);
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
651

652
    uint32_t status = 0;
H
Haojun Liao 已提交
653
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
654 655
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
656
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
657
    }
658

659 660 661
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
662
    }
663

H
Haojun Liao 已提交
664 665
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
666

H
Haojun Liao 已提交
667
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
668 669

    // todo refactor
H
Haojun Liao 已提交
670
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
671 672
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
673
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
674
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
675

H
Haojun Liao 已提交
676
    ASSERT(pBlock->info.id.uid != 0);
677
    return pBlock;
H
Haojun Liao 已提交
678 679 680 681
  }
  return NULL;
}

H
Haojun Liao 已提交
682
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
683 684 685 686
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
687
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
688 689 690
    return NULL;
  }

691 692
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
693 694 695
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
696 697
    }

698
    pTableScanInfo->scanTimes += 1;
699

700
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
701
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
702
      pTableScanInfo->base.scanFlag = REPEAT_SCAN;
703
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
704

705
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
706
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
707
    }
708
  }
H
Haojun Liao 已提交
709

710
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
711
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
712 713 714
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
715
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
716
    }
H
Haojun Liao 已提交
717

718
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
719 720 721
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
722
      }
H
Haojun Liao 已提交
723

724
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
725

726
      if (pTableScanInfo->scanTimes < total) {
727
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
728
        pTableScanInfo->base.scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
729

730
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
731
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
732
      }
H
Haojun Liao 已提交
733 734 735
    }
  }

wmmhello's avatar
wmmhello 已提交
736 737 738 739 740 741 742
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

743
  // scan table one by one sequentially
L
Liu Jicong 已提交
744
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
745
    int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
746

L
Liu Jicong 已提交
747
    while (1) {
H
Haojun Liao 已提交
748
      SSDataBlock* result = doGroupedTableScan(pOperator);
L
Liu Jicong 已提交
749 750 751
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
752

L
Liu Jicong 已提交
753 754
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
755
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
756 757
        return NULL;
      }
H
Haojun Liao 已提交
758

H
Haojun Liao 已提交
759
      STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
H
Haojun Liao 已提交
760
      tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
L
Liu Jicong 已提交
761 762
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
763

H
Haojun Liao 已提交
764
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
765 766
      pInfo->scanTimes = 0;
    }
767 768
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
H
Haojun Liao 已提交
769
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
770
        setOperatorCompleted(pOperator);
771 772
        return NULL;
      }
773

5
54liuyao 已提交
774
      int32_t        num = 0;
775
      STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
776
      tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
777
      ASSERT(pInfo->base.dataReader == NULL);
778

H
Haojun Liao 已提交
779 780
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
781 782 783
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
wmmhello's avatar
wmmhello 已提交
784
    }
H
Haojun Liao 已提交
785

H
Haojun Liao 已提交
786
    SSDataBlock* result = doGroupedTableScan(pOperator);
787
    if (result != NULL) {
H
Haojun Liao 已提交
788
      ASSERT(result->info.id.uid != 0);
789 790
      return result;
    }
H
Haojun Liao 已提交
791

H
Haojun Liao 已提交
792
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
793
      setOperatorCompleted(pOperator);
794 795
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
796

797 798
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
H
Haojun Liao 已提交
799 800
    pInfo->base.limitInfo.numOfOutputRows = 0;
    pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
wmmhello's avatar
wmmhello 已提交
801

5
54liuyao 已提交
802
    int32_t        num = 0;
803
    STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
804
    tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
805

H
Haojun Liao 已提交
806 807
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
808
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
809

H
Haojun Liao 已提交
810
    result = doGroupedTableScan(pOperator);
811 812 813
    if (result != NULL) {
      return result;
    }
814

H
Haojun Liao 已提交
815
    setOperatorCompleted(pOperator);
816 817
    return NULL;
  }
H
Haojun Liao 已提交
818 819
}

820 821
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
822
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
823
  *pRecorder = pTableScanInfo->base.readRecorder;
824 825 826 827 828
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

829
static void destroyTableScanOperatorInfo(void* param) {
830
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
831
  blockDataDestroy(pTableScanInfo->pResBlock);
H
Haojun Liao 已提交
832
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
H
Haojun Liao 已提交
833

H
Haojun Liao 已提交
834 835
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
836

H
Haojun Liao 已提交
837 838
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
839
  }
L
Liu Jicong 已提交
840

H
Haojun Liao 已提交
841 842
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
D
dapan1121 已提交
843
  taosMemoryFreeClear(param);
844 845
}

846
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
847
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
848 849 850
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
851
    goto _error;
H
Haojun Liao 已提交
852 853
  }

854
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
855
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
856 857

  int32_t numOfCols = 0;
858
  int32_t code =
H
Haojun Liao 已提交
859
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
860 861 862 863
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
864
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
865
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
866
  if (code != TSDB_CODE_SUCCESS) {
867
    goto _error;
868 869
  }

H
Haojun Liao 已提交
870
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
871
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
872
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
873
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
874 875
  }

876
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
H
Haojun Liao 已提交
877 878

  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
879 880
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
881 882
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
883

H
Haojun Liao 已提交
884
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
H
Haojun Liao 已提交
885 886

  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
887
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
888
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
889

H
Haojun Liao 已提交
890 891 892
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
893 894
  }

wmmhello's avatar
wmmhello 已提交
895
  pInfo->currentGroupId = -1;
896
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
897
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
898

L
Liu Jicong 已提交
899 900
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
901
  pOperator->exprSupp.numOfExprs = numOfCols;
902

H
Haojun Liao 已提交
903 904
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
905 906 907
    code = terrno;
    goto _error;
  }
908

H
Haojun Liao 已提交
909
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
H
Haojun Liao 已提交
910
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
911
                                         getTableScannerExecInfo);
912 913 914

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
915
  return pOperator;
916

917
_error:
918 919 920
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
921

922 923
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
924
  return NULL;
H
Haojun Liao 已提交
925 926
}

927
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
928
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
929
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
930

H
Haojun Liao 已提交
931
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
932
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
933

L
Liu Jicong 已提交
934 935
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
H
Haojun Liao 已提交
936
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
H
Haojun Liao 已提交
937 938 939
  return pOperator;
}

940
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
941 942
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
943 944
}

945
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
946
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
947 948
}

949
static bool isStateWindow(SStreamScanInfo* pInfo) {
950
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
951
}
5
54liuyao 已提交
952

L
Liu Jicong 已提交
953
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
954 955 956
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
957 958 959
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
960
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
961 962
}

963 964 965 966
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

967
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
968 969
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
970
  ASSERT(rowIndex < pBlock->info.rows);
971
  pInfo->groupId = groupCol[rowIndex];
972 973
}

L
Liu Jicong 已提交
974
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
975
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
976 977
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
978 979
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
980 981
}

L
Liu Jicong 已提交
982 983
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
984
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
985

986
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
987
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
988 989 990 991 992 993 994 995 996 997 998

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  blockDataCleanup(pBlock);

  STsdbReader* pReader = NULL;
H
Haojun Liao 已提交
999
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
5
54liuyao 已提交
1000
                                     GET_TASKID(pTaskInfo));
1001 1002
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1003
    T_LONG_JMP(pTaskInfo->env, code);
1004 1005 1006 1007 1008
    return NULL;
  }

  bool hasBlock = tsdbNextDataBlock(pReader);
  if (hasBlock) {
H
Haojun Liao 已提交
1009
    SDataBlockInfo* pBInfo = &pBlock->info;
1010

H
Haojun Liao 已提交
1011
    int32_t rows = 0;
H
Haojun Liao 已提交
1012
    tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window);
1013

H
Haojun Liao 已提交
1014
    SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1015 1016
    blockDataEnsureCapacity(pBlock, rows);
    pBlock->info.rows = rows;
1017

H
Haojun Liao 已提交
1018 1019
    relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true);
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows);
H
Haojun Liao 已提交
1020

H
Haojun Liao 已提交
1021
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid);
1022 1023 1024 1025
  }

  tsdbReaderClose(pReader);
  qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1026 1027
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1028 1029

  return pBlock->info.rows > 0 ? pBlock : NULL;
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1041
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
H
Haojun Liao 已提交
1042
  return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
1043 1044
}

5
54liuyao 已提交
1045 1046 1047 1048 1049 1050 1051 1052
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1053
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1054 1055 1056
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1067 1068 1069
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1070 1071 1072 1073 1074 1075

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1076
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1077 1078 1079 1080
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1081 1082 1083
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1084
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1085 1086 1087
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1088
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1089 1090 1091
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1092 1093
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1094 1095 1096 1097
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1098
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1099 1100 1101
  return true;
}

5
54liuyao 已提交
1102
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1103
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1104
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1105
  dumyInfo.cur.pageId = -1;
1106
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1107 1108
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1109
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1110

5
54liuyao 已提交
1111
  while (1) {
1112 1113 1114
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1115 1116 1117 1118 1119 1120
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) {
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1121
    }
5
54liuyao 已提交
1122

5
54liuyao 已提交
1123 1124 1125
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1126
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1127
    endWin = preWin;
5
54liuyao 已提交
1128
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1129 1130 1131 1132 1133 1134
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1135

L
Liu Jicong 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1147
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1148
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1149 1150
      tsdbReaderClose(pTableScanInfo->base.dataReader);
      pTableScanInfo->base.dataReader = NULL;
1151 1152
      return NULL;
    }
L
Liu Jicong 已提交
1153

H
Haojun Liao 已提交
1154
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1155 1156 1157 1158
    if (pResult->info.rows == 0) {
      continue;
    }

1159 1160 1161 1162 1163 1164 1165 1166
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1167 1168
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1169 1170 1171 1172 1173
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1174 1175 1176

      blockDataDestroy(tmpBlock);

1177 1178 1179 1180
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1181
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1182
      pResult->info.calWin = pInfo->updateWin;
1183
      return pResult;
5
54liuyao 已提交
1184 1185
    }
  }
1186
}
1187

1188
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1189
  blockDataCleanup(pDestBlock);
1190 1191
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1192
  }
1193
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1194
  if (code != TSDB_CODE_SUCCESS) {
1195
    return code;
L
Liu Jicong 已提交
1196
  }
1197 1198
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1199
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1200
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1201
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1202 1203
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1204

1205 1206
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1207
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1208
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1209 1210
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1211
  int64_t          version = pSrcBlock->info.version - 1;
1212
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1213
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1214
    // gap must be 0.
5
54liuyao 已提交
1215
    SSessionKey startWin = {0};
1216
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1217
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1218 1219 1220
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1221 1222 1223 1224 1225 1226
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
    colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);

5
54liuyao 已提交
1227
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1228
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1229 1230
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1231
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1232
  }
1233
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1234
}
1235 1236 1237 1238 1239 1240

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1241
  }
5
54liuyao 已提交
1242
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1243 1244 1245 1246
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1247 1248
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1249 1250 1251 1252
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
1253 1254 1255
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
1256 1257
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1258
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1259 1260 1261
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1262
  int64_t          version = pSrcBlock->info.version - 1;
1263
  for (int32_t i = 0; i < rows;) {
1264
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1265 1266 1267 1268 1269
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1270
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1271
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1272 1273
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1274 1275
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1276 1277 1278 1279
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1280
  }
1281 1282
  return TSDB_CODE_SUCCESS;
}
1283

1284
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1285 1286 1287
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1288 1289
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1290
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1291 1292 1293 1294
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1305
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1306 1307
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1308
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1309 1310 1311
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1312
    if (pInfo->tbnameCalSup.pExprInfo) {
1313 1314 1315
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1316 1317
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1318
      tdbFree(parTbname);
L
Liu Jicong 已提交
1319 1320 1321
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1322 1323 1324 1325
  }
  return TSDB_CODE_SUCCESS;
}

1326 1327 1328 1329
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1330
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1331
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1332 1333
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1334
  }
1335
  pDestBlock->info.type = STREAM_CLEAR;
1336
  pDestBlock->info.version = pSrcBlock->info.version;
1337 1338 1339 1340
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1341
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1342 1343
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1344 1345
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1346 1347

  void* tbname = NULL;
H
Haojun Liao 已提交
1348
  if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1349 1350 1351
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1352
  }
1353
  tdbFree(tbname);
L
Liu Jicong 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
1373
    memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1374
    int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
1375 1376
    memcpy(pBlock->info.parTbName, varDataVal(pData), len);
    /*pBlock->info.parTbName[len + 1] = 0;*/
L
Liu Jicong 已提交
1377 1378 1379 1380
  } else {
    pBlock->info.parTbName[0] = 0;
  }

H
Haojun Liao 已提交
1381 1382
  if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
    streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName);
L
Liu Jicong 已提交
1383 1384
  }

L
Liu Jicong 已提交
1385 1386 1387 1388
  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1389 1390
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1391 1392
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1393 1394
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1395 1396
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1397
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1398 1399
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1400 1401
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1402 1403
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1404
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1405
  pBlock->info.rows++;
5
54liuyao 已提交
1406 1407
}

1408
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1409 1410
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1411
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1412
  }
1413 1414
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1415
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1416
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1417
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1418 1419
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1420
    bool        isClosed = false;
5
54liuyao 已提交
1421
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1422
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1423 1424 1425
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1426
    // must check update info first.
H
Haojun Liao 已提交
1427
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1428
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1429
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1430
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1431
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1432
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1433
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1434
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1435
                                       NULL);
5
54liuyao 已提交
1436 1437
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
H
Haojun Liao 已提交
1438
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1439
                                         NULL);
5
54liuyao 已提交
1440
      }
1441 1442
    }
  }
1443 1444
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1445
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1446
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1447 1448
  }
}
L
Liu Jicong 已提交
1449

1450
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1451 1452
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1453
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1454

1455 1456
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1457
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1458
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1459
  pInfo->pRes->info.type = STREAM_NORMAL;
1460
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1461

H
Haojun Liao 已提交
1462
  pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
L
Liu Jicong 已提交
1463 1464

  // todo extract method
H
Haojun Liao 已提交
1465 1466 1467
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1468 1469 1470 1471 1472 1473 1474
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1475
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1476
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1477 1478 1479 1480 1481 1482 1483
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1484
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
L
Liu Jicong 已提交
1485 1486 1487 1488 1489 1490
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1491
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1492
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1493 1494
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1495
      blockDataFreeRes((SSDataBlock*)pBlock);
1496
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1497
    }
K
kailixu 已提交
1498 1499 1500

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1501 1502
  }

1503
  if (filter) {
H
Haojun Liao 已提交
1504
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1505
  }
1506

L
Liu Jicong 已提交
1507
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1508
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1509

L
Liu Jicong 已提交
1510
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1511 1512
  return 0;
}
5
54liuyao 已提交
1513

L
Liu Jicong 已提交
1514
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1515 1516
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1517

L
Liu Jicong 已提交
1518
  qDebug("queue scan called");
L
Liu Jicong 已提交
1519 1520 1521 1522 1523 1524 1525 1526

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1527
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1544
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1545 1546 1547 1548 1549 1550 1551 1552

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1553
    return NULL;
L
Liu Jicong 已提交
1554 1555
  }

L
Liu Jicong 已提交
1556 1557 1558
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1559
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1560
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1561 1562
      return pResult;
    } else {
1563 1564
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1565 1566
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1567
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1568
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1569
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
1570
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1571 1572 1573 1574
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1575 1576
        return NULL;
      }
1577 1578 1579
    }
  }

L
Liu Jicong 已提交
1580 1581 1582 1583 1584 1585
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1586
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1587 1588 1589
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1590
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1591
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1592 1593 1594 1595
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1596 1597 1598
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1599 1600
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1601
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1602 1603 1604 1605
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1606
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1607
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1608 1609 1610
        return NULL;
      }
    }
L
Liu Jicong 已提交
1611
#if 0
1612
    } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1613
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1614 1615 1616 1617 1618 1619
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1620
#endif
L
Liu Jicong 已提交
1621 1622 1623
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1624
  }
L
Liu Jicong 已提交
1625 1626
}

L
Liu Jicong 已提交
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
L
Liu Jicong 已提交
1655
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1656 1657
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1658
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1659 1660 1661 1662

  return 0;
}

5
54liuyao 已提交
1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  } else {
    // SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
    // if (!pPreRes || pPreRes->info.rows == 0) {
    //   return 0;
    // }
    // ASSERT(pPreRes->info.rows == 1);
    // return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
  }
}

L
Liu Jicong 已提交
1687 1688 1689 1690 1691
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1692
  qDebug("stream scan called");
L
Liu Jicong 已提交
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1726
  }
L
Liu Jicong 已提交
1727
#endif
H
Haojun Liao 已提交
1728

1729 1730
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1731
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1732
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1733
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1734 1735 1736 1737
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1738
    } else {
H
Haojun Liao 已提交
1739 1740 1741 1742
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1743
    }
L
Liu Jicong 已提交
1744 1745

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1746 1747
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1748

L
Liu Jicong 已提交
1749 1750 1751 1752 1753 1754 1755 1756
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
L
Liu Jicong 已提交
1757
      calBlockTbName(pInfo, pBlock);
1758
      if (pInfo->pUpdateInfo) {
L
Liu Jicong 已提交
1759 1760
        TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
        pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
1761
      }
1762
      qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
L
Liu Jicong 已提交
1763
      printDataBlock(pBlock, "scan recover");
L
Liu Jicong 已提交
1764 1765 1766
      return pBlock;
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1767
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1768 1769
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
1770

H
Haojun Liao 已提交
1771 1772
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1773

L
Liu Jicong 已提交
1774 1775 1776
    return NULL;
  }

5
54liuyao 已提交
1777
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1778
// TODO: refactor
L
Liu Jicong 已提交
1779
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1780
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1781
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1782
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1783
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1784 1785 1786
      return NULL;
    }

1787
    int32_t      current = pInfo->validBlockIndex++;
1788
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
H
Haojun Liao 已提交
1789 1790
    if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1791
    }
1792
    // TODO move into scan
5
54liuyao 已提交
1793 1794
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1795
    blockDataUpdateTsWindow(pBlock, 0);
1796
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1797 1798 1799
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1800 1801 1802
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1803 1804
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1805 1806 1807
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1808
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1809
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1810
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1811
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1812
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1813 1814
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1815
        }
5
54liuyao 已提交
1816 1817
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1818 1819 1820 1821 1822 1823
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1824
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1825
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1826
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1827
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1828 1829
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1830 1831 1832 1833 1834
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1835 1836 1837
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1838
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1839 1840 1841
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1842 1843 1844 1845
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1846
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1847
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1848 1849 1850 1851
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1852
        }
1853 1854 1855
      } break;
      default:
        break;
5
54liuyao 已提交
1856
    }
1857
    // printDataBlock(pBlock, "stream scan recv");
1858
    return pBlock;
L
Liu Jicong 已提交
1859
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1860
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1861 1862 1863 1864 1865 1866
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1867
      case STREAM_SCAN_FROM_DELETE_DATA: {
1868 1869 1870 1871 1872 1873 1874
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1885
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1886 1887
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1888 1889
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1890
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1891
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
1892 1893
          return pSDB;
        }
1894
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1895 1896 1897 1898
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1899
    }
1900

1901
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1902
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1903 1904
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1905 1906
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1907 1908
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1909
    }
5
54liuyao 已提交
1910

H
Haojun Liao 已提交
1911 1912
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1913
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1914

L
Liu Jicong 已提交
1915
  NEXT_SUBMIT_BLK:
1916 1917 1918
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1919
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1920
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1921
          qDebug("stream scan return empty, consume block %d", totBlockNum);
1922 1923
          return NULL;
        }
1924

1925 1926 1927 1928 1929 1930 1931 1932
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1933 1934
      }

1935 1936 1937 1938
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1939

1940 1941 1942 1943 1944 1945
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1946
        setBlockIntoRes(pInfo, &block, false);
1947

H
Haojun Liao 已提交
1948
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
1949
                             pInfo->pRes->info.version)) {
1950 1951 1952 1953 1954
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1971
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1972 1973 1974
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1975 1976 1977
          break;
        }
      }
1978
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1979
        break;
J
jiacy-jcy 已提交
1980 1981
      } else {
        pInfo->tqReader->pMsg = NULL;
1982
        continue;
5
54liuyao 已提交
1983
      }
1984
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1985 1986 1987 1988
    }

    // record the scan action.
    pInfo->numOfExec++;
1989
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
1990
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
1991

L
Liu Jicong 已提交
1992
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
1993 1994 1995
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
1996 1997 1998 1999 2000 2001

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2002 2003 2004
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2005 2006 2007
  }
}

H
Haojun Liao 已提交
2008
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2009 2010 2011
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2012 2013 2014
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2015 2016 2017 2018 2019 2020
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2021
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2022 2023
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2024
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2025
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2026
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2027

wmmhello's avatar
wmmhello 已提交
2028
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2029
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
wmmhello's avatar
wmmhello 已提交
2030
    SSDataBlock* pBlock = &pInfo->pRes;
2031

2032
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
2033 2034 2035
      if (isTaskKilled(pTaskInfo)) {
        longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
      }
2036

H
Haojun Liao 已提交
2037
      int32_t rows = 0;
H
Haojun Liao 已提交
2038
      tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
H
Haojun Liao 已提交
2039
      pBlock->info.rows = rows;
2040

wmmhello's avatar
wmmhello 已提交
2041 2042 2043
      SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      pBlock->pDataBlock = pCols;
      if (pCols == NULL) {
wmmhello's avatar
wmmhello 已提交
2044
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2045 2046
      }

H
Haojun Liao 已提交
2047
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2048
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2049
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2050 2051 2052
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2053 2054

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2055
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2056 2057
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2058 2059
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2060
    } else {
wmmhello's avatar
wmmhello 已提交
2061 2062
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2063
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2064 2065
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2066
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2067
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2068
    return NULL;
L
Liu Jicong 已提交
2069 2070 2071 2072 2073 2074 2075
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2076
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2077
      taosMemoryFreeClear(data);
2078 2079 2080
      return NULL;
    }

L
Liu Jicong 已提交
2081
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2082 2083 2084 2085
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2086
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2087
    } else {
wmmhello's avatar
wmmhello 已提交
2088 2089 2090 2091 2092 2093 2094
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2095

wmmhello's avatar
wmmhello 已提交
2096
    return NULL;
2097
  }
L
Liu Jicong 已提交
2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2136 2137 2138
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2139
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2140 2141 2142 2143 2144 2145
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2146 2147 2148
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2149
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2150 2151 2152 2153 2154
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2155 2156
  int32_t code = TSDB_CODE_SUCCESS;

2157
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2158
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2159
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2160 2161
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2162 2163
  }

wmmhello's avatar
wmmhello 已提交
2164 2165
  pInfo->vnode = pHandle->vnode;

2166
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2167 2168
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2169

H
Haojun Liao 已提交
2170
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
2171
  return pOperator;
H
Haojun Liao 已提交
2172

L
Liu Jicong 已提交
2173
_end:
H
Haojun Liao 已提交
2174 2175 2176 2177
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2178 2179
}

2180
static void destroyStreamScanOperatorInfo(void* param) {
2181 2182
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2183
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2184 2185 2186 2187
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2188 2189
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2190
  }
C
Cary Xu 已提交
2191 2192
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2193
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2194
  }
C
Cary Xu 已提交
2195

L
Liu Jicong 已提交
2196 2197
  cleanupExprSupp(&pStreamScan->tbnameCalSup);

L
Liu Jicong 已提交
2198
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2199 2200 2201 2202
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2203
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2204 2205 2206 2207
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2208
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2209
                                            SExecTaskInfo* pTaskInfo) {
2210 2211
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2212

H
Haojun Liao 已提交
2213 2214
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
2215
    goto _error;
H
Haojun Liao 已提交
2216 2217
  }

2218
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2219
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2220

2221
  pInfo->pTagCond = pTagCond;
2222
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2223

2224
  int32_t numOfCols = 0;
2225 2226
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2227 2228 2229
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2230

H
Haojun Liao 已提交
2231
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
2232
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2233
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2234
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2235 2236

    int16_t colId = id->colId;
2237
    taosArrayPush(pColIds, &colId);
2238
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2239
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2240
    }
H
Haojun Liao 已提交
2241 2242
  }

L
Liu Jicong 已提交
2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

H
Haojun Liao 已提交
2269 2270
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2271 2272
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2273 2274
  }

5
54liuyao 已提交
2275
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2276
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2277
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2278
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2279
      pTSInfo->base.cond.endVersion = pHandle->version;
2280
    }
L
Liu Jicong 已提交
2281

2282
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2283
    int32_t        num = 0;
H
Haojun Liao 已提交
2284
    tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
2285

2286
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2287
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2288 2289
      pTSInfo->base.dataReader = NULL;
      code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL);
dengyihao's avatar
dengyihao 已提交
2290 2291
      if (code != 0) {
        terrno = code;
H
Haojun Liao 已提交
2292
        destroyTableScanOperatorInfo(pTableScanOp);
2293
        goto _error;
L
Liu Jicong 已提交
2294
      }
L
Liu Jicong 已提交
2295 2296
    }

L
Liu Jicong 已提交
2297 2298 2299 2300
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2301
    } else {
L
Liu Jicong 已提交
2302 2303
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2304 2305
    }

2306
    pInfo->pUpdateInfo = NULL;
2307
    pInfo->pTableScanOp = pTableScanOp;
2308 2309 2310
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2311

L
Liu Jicong 已提交
2312 2313
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2314
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2315

L
Liu Jicong 已提交
2316
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2317
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
H
Haojun Liao 已提交
2318
    SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
2319
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2320 2321 2322 2323 2324
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2325
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2326 2327
  } else {
    taosArrayDestroy(pColIds);
5
54liuyao 已提交
2328 2329
  }

2330 2331 2332 2333 2334
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2335 2336 2337 2338 2339
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2340
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2341
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2342
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2343
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2344
  pInfo->groupId = 0;
2345
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2346
  pInfo->pStreamScanOp = pOperator;
2347
  pInfo->deleteDataIndex = 0;
2348
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2349
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2350
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2351
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2352
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2353

L
Liu Jicong 已提交
2354 2355
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2356
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2357

L
Liu Jicong 已提交
2358
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
H
Haojun Liao 已提交
2359
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
2360

H
Haojun Liao 已提交
2361
  return pOperator;
2362

L
Liu Jicong 已提交
2363
_error:
H
Haojun Liao 已提交
2364 2365 2366 2367 2368 2369 2370 2371
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2372 2373
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2374 2375
}

2376
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2377 2378 2379 2380
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2381 2382 2383
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2384
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2385
  SSDataBlock*  pRes = pInfo->pRes;
2386
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2387

H
Haojun Liao 已提交
2388
  int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2389
  if (size == 0) {
H
Haojun Liao 已提交
2390 2391 2392 2393
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2394 2395 2396
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2397
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2398

wmmhello's avatar
wmmhello 已提交
2399
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
H
Haojun Liao 已提交
2400
    STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2401
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2402
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2403
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2404 2405
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2406
      metaReaderClear(&mr);
2407
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2408
    }
H
Haojun Liao 已提交
2409

2410
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2411 2412 2413 2414 2415 2416
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
2417
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2418 2419
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2420
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2421

2422 2423 2424 2425
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2426 2427
          data = (char*)p;
        }
L
Liu Jicong 已提交
2428 2429
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2430

2431 2432
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2433
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2434
        }
H
Haojun Liao 已提交
2435 2436 2437
      }
    }

2438
    count += 1;
wmmhello's avatar
wmmhello 已提交
2439
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2440
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2441 2442 2443
    }
  }

2444 2445
  metaReaderClear(&mr);

2446
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2447
  if (pOperator->status == OP_EXEC_DONE) {
2448
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2449 2450 2451
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2452
  pOperator->resultInfo.totalRows += count;
2453

2454
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2455 2456
}

2457
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2458 2459
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2460
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2461
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2462 2463
}

H
Haojun Liao 已提交
2464
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
2465
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2466 2467 2468 2469 2470
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2471 2472 2473 2474
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2475
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2476 2477 2478
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2479

H
Haojun Liao 已提交
2480 2481
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2482 2483 2484
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2485

H
Haojun Liao 已提交
2486
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2487 2488
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2489

L
Liu Jicong 已提交
2490 2491
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2492
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2493 2494
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
2495
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL);
H
Haojun Liao 已提交
2496 2497

  return pOperator;
2498

2499
_error:
H
Haojun Liao 已提交
2500 2501 2502 2503 2504
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2505

dengyihao's avatar
dengyihao 已提交
2506
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
  blockDataCleanup(pBlock);

  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
2520
  void*        p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2521
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2522

H
Haojun Liao 已提交
2523
  int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
2524
  if (code != 0) {
H
Haojun Liao 已提交
2525
    T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
dengyihao 已提交
2526
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2527

H
Haojun Liao 已提交
2528
  STsdbReader* reader = pInfo->base.dataReader;
dengyihao's avatar
opt mem  
dengyihao 已提交
2529
  while (tsdbNextDataBlock(reader)) {
H
Haojun Liao 已提交
2530 2531
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
dengyihao's avatar
opt mem  
dengyihao 已提交
2532 2533 2534 2535 2536 2537 2538 2539 2540 2541
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

    blockDataCleanup(pBlock);

H
Haojun Liao 已提交
2542
    int32_t rows = 0;
H
Haojun Liao 已提交
2543
    tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
H
Haojun Liao 已提交
2544 2545
    blockDataEnsureCapacity(pBlock, rows);
    pBlock->info.rows = rows;
dengyihao's avatar
opt mem  
dengyihao 已提交
2546

H
Haojun Liao 已提交
2547
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2548 2549 2550 2551
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2552 2553

    uint32_t status = 0;
H
Haojun Liao 已提交
2554
    loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
S
slzhou 已提交
2555
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2556
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2557
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2558 2559 2560 2561 2562 2563 2564
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

H
Haojun Liao 已提交
2565
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2566

H
Haojun Liao 已提交
2567
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2568
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2569

H
Haojun Liao 已提交
2570 2571
    tsdbReaderClose(pInfo->base.dataReader);
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2572 2573
    return pBlock;
  }
H
Haojun Liao 已提交
2574

H
Haojun Liao 已提交
2575 2576
  tsdbReaderClose(pInfo->base.dataReader);
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2577 2578 2579
  return NULL;
}

2580 2581 2582
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2583
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2584
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2585
      tsTargetSlotId = colInfo->dstSlotId;
2586 2587 2588
    }
  }

2589 2590 2591
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2592
  bi.slotId = tsTargetSlotId;
2593 2594 2595 2596 2597 2598 2599
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2600
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2601 2602 2603 2604 2605 2606 2607
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2608

2609
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2610 2611 2612
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2613
  {
H
Haojun Liao 已提交
2614
    size_t  numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2615
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2616
    for (; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2617
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
S
slzhou 已提交
2618 2619 2620 2621 2622 2623
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2624

S
slzhou 已提交
2625 2626
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2627

H
Haojun Liao 已提交
2628
  pInfo->base.dataReader = NULL;
2629

2630 2631
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2632
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2633
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2634 2635
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2636

dengyihao's avatar
dengyihao 已提交
2637
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2638 2639 2640 2641 2642 2643

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2644 2645 2646 2647 2648
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2649 2650

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2651
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2652
    taosArrayPush(pInfo->queryConds, &cond);
2653 2654
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2655
  for (int32_t i = 0; i < numOfTable; ++i) {
2656
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2657
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2658
    ps->param = param;
2659
    ps->onlyRef = true;
2660 2661 2662 2663 2664 2665
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2666
    T_LONG_JMP(pTaskInfo->env, terrno);
2667 2668
  }

2669 2670 2671 2672 2673 2674 2675
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2676
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2677

2678 2679 2680 2681 2682 2683 2684
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2685
  for (int32_t i = 0; i < numOfTable; ++i) {
2686 2687 2688
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
2689 2690
  taosArrayClear(pInfo->sortSourceParams);

2691
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2692
  pInfo->pSortHandle = NULL;
2693

dengyihao's avatar
opt mem  
dengyihao 已提交
2694 2695 2696
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2697
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2698 2699 2700
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2701 2702 2703
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2704 2705
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2706 2707 2708
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2709
  blockDataCleanup(pResBlock);
2710 2711

  while (1) {
2712
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2713 2714 2715 2716
    if (pTupleHandle == NULL) {
      break;
    }

2717 2718
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2719 2720 2721 2722
      break;
    }
  }

2723
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
2724 2725 2726
  applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
  pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;

2727
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2740
    T_LONG_JMP(pTaskInfo->env, code);
2741
  }
2742

H
Haojun Liao 已提交
2743
  size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2744 2745
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2746

S
slzhou 已提交
2747
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2748
      setOperatorCompleted(pOperator);
2749 2750
      return NULL;
    }
S
slzhou 已提交
2751
    pInfo->tableStartIndex = 0;
H
Haojun Liao 已提交
2752
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
2753 2754
    startGroupTableMergeScan(pOperator);
  }
2755

S
slzhou 已提交
2756 2757
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
2758 2759
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2760
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2761
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2762 2763 2764 2765 2766
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2767
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2768 2769 2770
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
H
Haojun Liao 已提交
2771
      pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2772 2773
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2774 2775
  }

2776 2777 2778
  return pBlock;
}

2779
void destroyTableMergeScanOperatorInfo(void* param) {
2780
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2781
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2782

dengyihao's avatar
dengyihao 已提交
2783 2784 2785
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2786 2787
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2788
  }
H
Haojun Liao 已提交
2789

2790
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2791 2792
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2793

H
Haojun Liao 已提交
2794 2795
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2796

dengyihao's avatar
opt mem  
dengyihao 已提交
2797 2798 2799
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2800
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2801
  taosArrayDestroy(pTableScanInfo->queryConds);
2802

H
Haojun Liao 已提交
2803 2804
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
2805 2806 2807 2808 2809 2810
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
H
Haojun Liao 已提交
2811
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
L
Liu Jicong 已提交
2812

H
Haojun Liao 已提交
2813 2814 2815 2816
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);

D
dapan1121 已提交
2817
  taosMemoryFreeClear(param);
2818 2819 2820 2821
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2822 2823
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2824
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2825
  execInfo->blockRecorder = pInfo->base.readRecorder;
2826
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2827 2828 2829

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2830

2831 2832 2833
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2834 2835
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo) {
2836 2837 2838 2839 2840
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2841

2842 2843 2844
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
2845
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
2846
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
2847 2848 2849
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2850

H
Haojun Liao 已提交
2851
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
2852
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2853
    taosArrayDestroy(pInfo->base.matchInfo.pList);
2854 2855 2856 2857
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
2858
    SExprSupp* pSup = &pInfo->base.pseudoSup;
2859 2860
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
2861 2862 2863 2864
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
2865 2866 2867 2868 2869 2870
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
2871 2872
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
2873
  pInfo->base.readHandle = *readHandle;
2874 2875 2876

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
H
Haojun Liao 已提交
2877

2878
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2879
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
2880 2881 2882 2883 2884 2885

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2886
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
2887
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
2888 2889
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

2890
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2891

H
Haojun Liao 已提交
2892
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
2893
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2894
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
2895

dengyihao's avatar
dengyihao 已提交
2896
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
2897 2898
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
2899

L
Liu Jicong 已提交
2900 2901
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2902
  pOperator->exprSupp.numOfExprs = numOfCols;
2903

L
Liu Jicong 已提交
2904 2905
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         getTableMergeScanExplainExecInfo);
2906 2907 2908 2909 2910 2911 2912 2913 2914
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
2915 2916 2917 2918

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010
static void         destoryTableCountScanOperator(void* param);
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
    strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3011

S
slzhou 已提交
3012
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3013 3014 3015
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3016
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3017
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3018
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3019 3020 3021 3022 3023 3024 3025 3026 3027

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3028
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3029 3030
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3031 3032 3033
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3034 3035 3036

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
S
slzhou 已提交
3037 3038
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
S
shenglian zhou 已提交
3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3050 3051 3052
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3053
    ASSERT(strlen(dbName));
S
slzhou 已提交
3054
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
S
slzhou 已提交
3055
    char             varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
S
slzhou 已提交
3056 3057 3058 3059 3060 3061 3062
    strncpy(varDataVal(varDbName), dbName, strlen(dbName));
    varDataSetLen(varDbName, strlen(dbName));
    colDataAppend(colInfoData, 0, varDbName, false);
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3063 3064 3065 3066 3067 3068 3069 3070
    if (strlen(stbName) != 0) {
      char             varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
      strncpy(varDataVal(varStbName), stbName, strlen(stbName));
      varDataSetLen(varStbName, strlen(stbName));
      colDataAppend(colInfoData, 0, varStbName, false);
    } else {
      colDataAppendNULL(colInfoData, 0);
    }
S
slzhou 已提交
3071 3072 3073
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3074
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
S
slzhou 已提交
3075 3076 3077 3078 3079
    colDataAppend(colInfoData, 0, (char*)&count, false);
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3080
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3081 3082 3083
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3084
  size_t infodbTableNum;
S
slzhou 已提交
3085
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3086
  size_t perfdbTableNum;
S
slzhou 已提交
3087 3088 3089 3090 3091
  getPerfDbMeta(NULL, &perfdbTableNum);

  if (pSupp->groupByDbName) {
    if (pInfo->currGrpIdx == 0) {
      uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
3092
      pRes->info.id.groupId = groupId;
S
slzhou 已提交
3093 3094 3095
      fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
    } else if (pInfo->currGrpIdx == 1) {
      uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
3096
      pRes->info.id.groupId = groupId;
S
slzhou 已提交
3097
      fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
S
slzhou 已提交
3098 3099 3100
    } else {
      setOperatorCompleted(pOperator);
      return NULL;
S
slzhou 已提交
3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111
    }
    pInfo->currGrpIdx++;
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
    if (strcmp(pSupp->dbName, TSDB_INFORMATION_SCHEMA_DB) == 0) {
      fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
    } else if (strcmp(pSupp->dbName, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
      fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
    } else if (strlen(pSupp->dbName) == 0) {
      fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
    }
S
slzhou 已提交
3112
    setOperatorCompleted(pOperator);
S
slzhou 已提交
3113 3114 3115 3116
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
shenglian zhou 已提交
3117
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3118 3119 3120 3121
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3122
  blockDataCleanup(pRes);
3123

S
slzhou 已提交
3124 3125 3126
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3127
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3128
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3129
  }
S
slzhou 已提交
3130

S
slzhou 已提交
3131 3132
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3133
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3134

S
slzhou 已提交
3135 3136 3137 3138 3139
  {
    // get dbname
    vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
    SName            sn = {0};
    tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
S
slzhou 已提交
3140
    tNameGetDbName(&sn, dbName);
S
shenglian zhou 已提交
3141
  }
S
slzhou 已提交
3142 3143
  if (pSupp->groupByDbName) {
    if (pSupp->groupByStbName) {
S
slzhou 已提交
3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154
      if (pInfo->stbUidList == NULL) {
        pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
        if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
          qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
        }
      }
      if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
        tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);

        char stbName[TSDB_TABLE_NAME_LEN] = {0};
        metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
S
slzhou 已提交
3155

S
slzhou 已提交
3156
        char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
S
slzhou 已提交
3157
        snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
S
slzhou 已提交
3158
        uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
3159
        pRes->info.id.groupId = groupId;
S
slzhou 已提交
3160

S
slzhou 已提交
3161 3162 3163
        SMetaStbStats stats = {0};
        metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
        int64_t ctbNum = stats.ctbNum;
S
slzhou 已提交
3164 3165

        fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
slzhou 已提交
3166
        
3167 3168 3169 3170 3171 3172 3173 3174 3175
        pInfo->currGrpIdx++;
      } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
        char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
        snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
        uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
        pRes->info.id.groupId = groupId;
        int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
        fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);

S
slzhou 已提交
3176
        pInfo->currGrpIdx++;
S
slzhou 已提交
3177 3178 3179
      } else {
        setOperatorCompleted(pOperator);
        return NULL;
S
slzhou 已提交
3180
      }
S
slzhou 已提交
3181 3182
    } else {
      uint64_t groupId = calcGroupId(dbName, strlen(dbName));
3183
      pRes->info.id.groupId = groupId;
S
slzhou 已提交
3184 3185
      int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
S
slzhou 已提交
3186
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3187 3188 3189 3190
    }
  } else {
    if (strlen(pSupp->dbName) != 0) {
      if (strlen(pSupp->stbName) != 0) {
S
slzhou 已提交
3191 3192 3193 3194 3195
        tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbName);
        SMetaStbStats stats = {0};
        metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
        int64_t ctbNum = stats.ctbNum;
        fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbName, ctbNum, pRes);
S
slzhou 已提交
3196
      } else {
S
slzhou 已提交
3197
        int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
S
slzhou 已提交
3198
        fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3199
      }
S
slzhou 已提交
3200 3201 3202
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3203
    }
S
slzhou 已提交
3204
    setOperatorCompleted(pOperator);
S
slzhou 已提交
3205
  }
S
slzhou 已提交
3206
  return pRes->info.rows > 0 ? pRes : NULL;
S
shenglian zhou 已提交
3207 3208 3209
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3210
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3211 3212
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3213
  nodesDestroyList(pTableCountScanInfo->groupTags);
S
slzhou 已提交
3214
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3215 3216
  taosMemoryFreeClear(param);
}