scanoperator.c 141.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33
#include "vnode.h"
H
Haojun Liao 已提交
34 35

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
36
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
37

38
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
39 40
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
                                     size_t size, const char* dbName);
41

L
Liu Jicong 已提交
42
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
43

44 45 46
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
                                                SMetaReader* smrChildTable, const char* dbname, const char* tableName,
                                                int32_t* pNumOfRows, const SSDataBlock* dataBlock);
47 48 49

static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
bool        processBlockWithProbability(const SSampleExecInfo* pInfo) {
50 51 52 53 54 55 56 57 58 59 60 61
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

62
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

91 92 93 94 95 96 97 98 99
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
100
  // convert key to second
101 102 103 104 105 106 107
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
108
  time_t    t = (time_t)key;
109 110 111 112 113
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
114
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
115 116 117 118

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
119
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
120 121 122 123

  tw->ekey -= 1;
}

124
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
125 126 127 128 129 130 131
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

132
  if (order == TSDB_ORDER_ASC) {
133
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
134 135
    assert(w.ekey >= pBlockInfo->window.skey);

S
slzhou 已提交
136
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
137 138 139
      return true;
    }

140 141
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
142 143 144 145 146
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
147
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
148 149 150 151
        return true;
      }
    }
  } else {
152
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
153 154
    assert(w.skey <= pBlockInfo->window.ekey);

155
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
156 157 158
      return true;
    }

159
    while (1) {
160 161 162 163 164 165
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
166
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
167 168 169
        return true;
      }
    }
170 171 172 173 174
  }

  return false;
}

175 176 177 178 179 180 181 182 183 184 185
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

186 187
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

  if (p1 == NULL) {
    return NULL;
  }

  *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId);
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

  if (pTableScanInfo->pdInfo.pExprSup == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup;

  SFilePage*  pPage = NULL;
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage);

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset);

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
  releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage);

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
236 237 238 239 240 241 242 243 244 245
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
                                            int32_t numOfRows) {
  if (pColsAgg == NULL || pFilterNode == NULL) {
    return true;
  }

  SFilterInfo* filter = NULL;

  // todo move to the initialization function
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
L
Liu Jicong 已提交
246
  bool    keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
247 248 249 250 251 252 253 254 255 256 257

  filterFreeInfo(filter);
  return keep;
}

static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
  bool             allColumnsHaveAgg = true;
  SColumnDataAgg** pColAgg = NULL;

  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
  if (code != TSDB_CODE_SUCCESS) {
258
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  }

  if (!allColumnsHaveAgg) {
    return false;
  }

  //  if (allColumnsHaveAgg == true) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

  // todo create this buffer during creating operator
  if (pBlock->pBlockAgg == NULL) {
    pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
    if (pBlock->pBlockAgg == NULL) {
272
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
H
Haojun Liao 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286
    }
  }

  for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
    SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
    if (!pColMatchInfo->output) {
      continue;
    }
    pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
  }

  return true;
}

H
Haojun Liao 已提交
287 288 289 290 291 292 293 294 295 296 297 298
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
                                          GET_TASKID(pTaskInfo));
    if (code != TSDB_CODE_SUCCESS) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
  }
}

L
Liu Jicong 已提交
299 300
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
301
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
302 303
  STableScanInfo* pInfo = pOperator->info;

304
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
305 306

  pCost->totalBlocks += 1;
307
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
308
  bool loadSMA = false;
H
Haojun Liao 已提交
309

310
  *status = pInfo->dataBlockLoadFlag;
311
  if (pTableScanInfo->pFilterNode != NULL ||
312
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
313 314 315 316
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
317
  taosMemoryFreeClear(pBlock->pBlockAgg);
318 319

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
320 321
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
322 323 324
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
325 326
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
H
Haojun Liao 已提交
327 328

    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
329 330 331 332
    pCost->skipBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
333
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
334
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
335
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
336 337
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
H
Haojun Liao 已提交
338
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
339 340
      return TSDB_CODE_SUCCESS;
    } else {
341
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
342
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
343
    }
H
Haojun Liao 已提交
344
  }
345

H
Haojun Liao 已提交
346
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
347

H
Haojun Liao 已提交
348
  // try to filter data block according to sma info
349 350 351 352 353 354 355 356 357 358 359 360 361
  if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) {
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
      bool   keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows);
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
362
    }
H
Haojun Liao 已提交
363
  }
364

365 366 367
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

368
  // try to filter data block according to current results
369 370
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
371
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
372 373 374
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

375
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
376 377 378
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
379 380
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
381

H
Haojun Liao 已提交
382 383 384
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
385 386
  }

387
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
H
Haojun Liao 已提交
388
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
389

390 391
  if (pTableScanInfo->pFilterNode != NULL) {
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
392
    doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo);
393

394 395
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
396

397 398 399 400 401 402 403
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
404 405
  }

H
Haojun Liao 已提交
406 407 408
  return TSDB_CODE_SUCCESS;
}

409
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
410 411 412
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
413
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
414

415
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
416 417
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
418 419
}

H
Haojun Liao 已提交
420
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
L
Liu Jicong 已提交
421
                               SSDataBlock* pBlock, const char* idStr) {
422
  // currently only the tbname pseudo column
423
  if (numOfPseudoExpr == 0) {
H
Haojun Liao 已提交
424
    return TSDB_CODE_SUCCESS;
425 426 427
  }

  SMetaReader mr = {0};
428
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
429 430
  int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
431
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
432 433 434
    metaReaderClear(&mr);
    return terrno;
  }
435

436 437
  metaReaderReleaseLock(&mr);

438 439
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
440 441 442 443

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
444
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
445 446 447 448 449

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
450
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
451
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
452 453
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
454
      const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
455

456 457 458 459
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
460
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
461
      }
462

H
Haojun Liao 已提交
463 464 465
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
466
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
467
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
468 469 470
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
471
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
472
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
473
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
474
        }
475 476 477 478 479
      }
    }
  }

  metaReaderClear(&mr);
H
Haojun Liao 已提交
480
  return TSDB_CODE_SUCCESS;
481 482
}

483 484 485 486
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

487 488
  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
  colInfoDataEnsureCapacity(&infoData, 1);
489

490
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
491
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
492 493

  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
494 495 496 497 498 499 500

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
501
  colDataDestroy(&infoData);
502 503
}

504
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
505
  STableScanInfo* pTableScanInfo = pOperator->info;
506
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
507
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
508

509 510
  int64_t st = taosGetTimestampUs();

511
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
512
    if (isTaskKilled(pTaskInfo)) {
513
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
514
    }
H
Haojun Liao 已提交
515

516 517 518 519 520 521
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

522 523 524 525 526 527 528 529
    blockDataCleanup(pBlock);

    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.rows);
    pBlock->info = binfo;
L
Liu Jicong 已提交
530
    ASSERT(binfo.uid != 0);
H
Haojun Liao 已提交
531

532 533 534 535 536
    uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

537 538 539 540
    uint32_t status = 0;
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
541
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
542
    }
543

544 545 546
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
547
    }
548 549 550 551 552

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
553 554

    // todo refactor
L
Liu Jicong 已提交
555 556 557 558 559
    /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
560

L
Liu Jicong 已提交
561
    ASSERT(pBlock->info.uid != 0);
562
    return pBlock;
H
Haojun Liao 已提交
563 564 565 566
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
567
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
568 569 570 571
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
572
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
573 574 575
    return NULL;
  }

576 577
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
578 579 580 581
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      ASSERT(p->info.uid != 0);
      return p;
H
Haojun Liao 已提交
582 583
    }

584
    pTableScanInfo->scanTimes += 1;
585

586
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
587 588
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
L
Liu Jicong 已提交
589 590 591 592
      qDebug(
          "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
          "due to query func required",
          GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
593

594
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
595
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
H
Haojun Liao 已提交
596
    }
597
  }
H
Haojun Liao 已提交
598

599
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
600
  if (pTableScanInfo->scanTimes < total) {
601
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
602 603
      prepareForDescendingScan(pTableScanInfo, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
604
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
605
    }
H
Haojun Liao 已提交
606

607
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
608 609 610
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
611
      }
H
Haojun Liao 已提交
612

613
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
614

615
      if (pTableScanInfo->scanTimes < total) {
616 617
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
618

619 620
        qDebug("%s start to repeat descending order scan data blocks due to query func required",
               GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
621
        tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
622
      }
H
Haojun Liao 已提交
623 624 625
    }
  }

wmmhello's avatar
wmmhello 已提交
626 627 628 629 630 631 632
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

L
Liu Jicong 已提交
633 634
  // if scan table by table
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
635 636 637 638 639 640
    if (pInfo->noTable) {
      return NULL;
    }

    int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);

L
Liu Jicong 已提交
641
    while (1) {
L
Liu Jicong 已提交
642 643 644 645
      SSDataBlock* result = doTableScanGroup(pOperator);
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
646

L
Liu Jicong 已提交
647 648
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
649
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
650 651
        return NULL;
      }
H
Haojun Liao 已提交
652

L
Liu Jicong 已提交
653 654
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
      tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
L
Liu Jicong 已提交
655 656
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
657

H
Haojun Liao 已提交
658
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
L
Liu Jicong 已提交
659 660 661 662
      pInfo->scanTimes = 0;
    }
  }

663
  if (pInfo->currentGroupId == -1) {
wmmhello's avatar
wmmhello 已提交
664
    pInfo->currentGroupId++;
wmmhello's avatar
wmmhello 已提交
665
    if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
666
      setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
667 668
      return NULL;
    }
H
Haojun Liao 已提交
669

670
    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
H
Haojun Liao 已提交
671 672 673

    tsdbReaderClose(pInfo->dataReader);

674 675
    int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
                                  GET_TASKID(pTaskInfo));
676
    if (code != TSDB_CODE_SUCCESS) {
677
      T_LONG_JMP(pTaskInfo->env, code);
678 679
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
680 681 682
  }

  SSDataBlock* result = doTableScanGroup(pOperator);
683
  if (result) {
wmmhello's avatar
wmmhello 已提交
684 685 686 687 688
    return result;
  }

  pInfo->currentGroupId++;
  if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
689
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
690 691 692
    return NULL;
  }

693
  SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
694
  //  tsdbSetTableList(pInfo->dataReader, tableList);
wmmhello's avatar
wmmhello 已提交
695

H
Haojun Liao 已提交
696
  tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
wmmhello's avatar
wmmhello 已提交
697 698 699
  pInfo->scanTimes = 0;

  result = doTableScanGroup(pOperator);
700
  if (result) {
wmmhello's avatar
wmmhello 已提交
701 702 703
    return result;
  }

704 705
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
706 707
}

708 709
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
710
  STableScanInfo*         pTableScanInfo = pOptr->info;
711 712 713 714 715 716
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

717
static void destroyTableScanOperatorInfo(void* param) {
718
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
719
  blockDataDestroy(pTableScanInfo->pResBlock);
720
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
H
Haojun Liao 已提交
721

H
refact  
Hongze Cheng 已提交
722
  tsdbReaderClose(pTableScanInfo->dataReader);
723
  pTableScanInfo->dataReader = NULL;
724 725 726 727

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
L
Liu Jicong 已提交
728

729
  cleanupExprSupp(&pTableScanInfo->pseudoSup);
D
dapan1121 已提交
730
  taosMemoryFreeClear(param);
731 732
}

wmmhello's avatar
wmmhello 已提交
733
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
734
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
735 736 737
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
738
    goto _error;
H
Haojun Liao 已提交
739 740
  }

741
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
742
  int32_t             numOfCols = 0;
L
Liu Jicong 已提交
743 744
  pInfo->pColMatchInfo =
      extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
745

746 747
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
748
    goto _error;
749 750 751
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
752
    SExprSupp* pSup = &pInfo->pseudoSup;
753 754
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
755 756
  }

757
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
758
  pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode);
759 760 761
  pInfo->readHandle = *readHandle;
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
762

763
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
764 765
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
H
Haojun Liao 已提交
766 767 768 769 770

  if (pInfo->pFilterNode != NULL) {
    code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
  }

771
  pInfo->scanFlag = MAIN_SCAN;
wmmhello's avatar
wmmhello 已提交
772
  pInfo->currentGroupId = -1;
773
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
774 775

  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
776
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
777 778 779
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
780
  pOperator->exprSupp.numOfExprs = numOfCols;
781
  pOperator->pTaskInfo = pTaskInfo;
782

783 784
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
785 786 787

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
788
  return pOperator;
789

790
_error:
791 792 793 794 795
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);

  pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
796 797
}

798
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
799
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
800
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
801

L
Liu Jicong 已提交
802 803
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
804

805
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
806
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
807 808 809 810
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
811

812
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
813 814 815
  return pOperator;
}

H
Haojun Liao 已提交
816 817
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, const char* idstr) {
  *rowLen = 0;
H
Haojun Liao 已提交
818

819
  SMetaReader mr = {0};
820
  metaReaderInit(&mr, pMeta, 0);
H
Haojun Liao 已提交
821 822
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
823
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
H
Haojun Liao 已提交
824 825 826 827
    metaReaderClear(&mr);
    return terrno;
  }

828 829
  if (mr.me.type == TSDB_SUPER_TABLE) {
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
830
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
831
      (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
832 833 834
    }
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    uint64_t suid = mr.me.ctbEntry.suid;
835
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
836 837
    code = metaGetTableEntryByUid(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
838
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
H
Haojun Liao 已提交
839 840 841 842
      metaReaderClear(&mr);
      return terrno;
    }

843 844
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;

845
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
846
      (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
847 848 849
    }
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
850
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
851
      (*rowLen) += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
852 853 854 855
    }
  }

  metaReaderClear(&mr);
H
Haojun Liao 已提交
856
  return TSDB_CODE_SUCCESS;
857 858 859 860 861 862 863 864
}

static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SBlockDistInfo* pBlockScanInfo = pOperator->info;
L
Liu Jicong 已提交
865
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
866 867

  STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
L
Liu Jicong 已提交
868 869
  int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize,
                                   GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
870
  if (code != TSDB_CODE_SUCCESS) {
871
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
872
  }
873 874 875

  tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
  blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
H
Haojun Liao 已提交
876

877
  SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
H
Haojun Liao 已提交
878

879
  int32_t          slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
880
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
881

882
  int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
883
  char*   p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
884 885 886
  tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
  varDataSetLen(p, len);

887
  blockDataEnsureCapacity(pBlock, 1);
888 889
  colDataAppend(pColInfo, 0, p, false);
  taosMemoryFree(p);
H
Haojun Liao 已提交
890

891 892
  pBlock->info.rows = 1;

H
Haojun Liao 已提交
893 894 895 896
  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

897
static void destroyBlockDistScanOperatorInfo(void* param) {
898
  SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
899
  blockDataDestroy(pDistInfo->pResBlock);
H
Hongze Cheng 已提交
900
  tsdbReaderClose(pDistInfo->pHandle);
D
dapan1121 已提交
901
  taosMemoryFreeClear(param);
902 903
}

904 905
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
906
  SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
907
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
908 909 910 911 912
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

913
  pInfo->pHandle = dataReader;
914
  pInfo->readHandle = *readHandle;
915 916
  pInfo->uid = uid;
  pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
917

918
  int32_t    numOfCols = 0;
919
  SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
920
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
921 922 923
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
924

925
  pOperator->name = "DataBlockDistScanOperator";
926
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
927 928 929
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
930 931 932 933
  pOperator->pTaskInfo = pTaskInfo;

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
                                         destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
934 935
  return pOperator;

936
_error:
H
Haojun Liao 已提交
937 938 939 940 941
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

942
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
943 944
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
945 946
}

947
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
948
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
949 950
}

951
static bool isStateWindow(SStreamScanInfo* pInfo) {
952
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
953
}
5
54liuyao 已提交
954

L
Liu Jicong 已提交
955
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
956 957 958
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
959 960 961
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
962
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
963 964
}

965 966 967 968
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

969
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
970 971
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
972
  ASSERT(rowIndex < pBlock->info.rows);
973
  pInfo->groupId = groupCol[rowIndex];
974 975
}

L
Liu Jicong 已提交
976
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
977
  pTableScanInfo->cond.twindows = *pWin;
L
Liu Jicong 已提交
978 979 980 981
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
}

L
Liu Jicong 已提交
982
static void freeArray(void* array) { taosArrayDestroy(array); }
983 984 985 986 987 988 989 990 991 992 993 994 995

static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
  STableScanInfo* pTableScanInfo = pTableScanOp->info;
  pTableScanInfo->cond.startVersion = -1;
  pTableScanInfo->cond.endVersion = -1;
  SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
  SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
  taosArrayClearP(gpTbls, freeArray);
  taosArrayPush(gpTbls, &allTbls);
  STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
  resetTableScanInfo(pTableScanOp->info, &win);
}

L
Liu Jicong 已提交
996 997
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
998 999 1000
  SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
  taosArrayClear(gpTbls);
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
L
Liu Jicong 已提交
1001
  SArray*       tbls = taosArrayInit(1, sizeof(STableKeyInfo));
1002 1003 1004
  taosArrayPush(tbls, &tblInfo);
  taosArrayPush(gpTbls, &tbls);

L
Liu Jicong 已提交
1005
  STimeWindow     win = {.skey = startTs, .ekey = endTs};
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
  STableScanInfo* pTableScanInfo = pTableScanOp->info;
  pTableScanInfo->cond.startVersion = -1;
  pTableScanInfo->cond.endVersion = maxVersion;
  resetTableScanInfo(pTableScanOp->info, &win);
  SSDataBlock* pRes = doTableScan(pTableScanOp);
  resetTableScanOperator(pTableScanOp);
  return pRes;
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1024
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1025 1026 1027 1028 1029 1030 1031 1032
  SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
  uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
  if (groupId) {
    return *groupId;
  }
  return 0;
}

5
54liuyao 已提交
1033 1034 1035 1036 1037 1038 1039 1040
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1052 1053 1054
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1055 1056 1057 1058 1059 1060

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1061
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1062 1063 1064 1065
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1066 1067 1068
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1069
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1070 1071 1072
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1073
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1074 1075 1076
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1077 1078
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1079 1080 1081 1082
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1083
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1084 1085 1086
  return true;
}

5
54liuyao 已提交
1087
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1088
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1089
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1090
  dumyInfo.cur.pageId = -1;
1091
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1092 1093
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1094
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1095

5
54liuyao 已提交
1096
  while (1) {
1097 1098 1099
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1100 1101 1102 1103 1104 1105
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) {
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1106
    }
5
54liuyao 已提交
1107

5
54liuyao 已提交
1108 1109 1110
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1111
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1112
    endWin = preWin;
5
54liuyao 已提交
1113
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1114 1115 1116 1117 1118 1119
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1120

L
Liu Jicong 已提交
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1132
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1133 1134 1135
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
      tsdbReaderClose(pTableScanInfo->dataReader);
      pTableScanInfo->dataReader = NULL;
1136 1137
      return NULL;
    }
L
Liu Jicong 已提交
1138

H
Haojun Liao 已提交
1139
    doFilter(pInfo->pCondition, pResult, NULL, NULL);
1140 1141 1142 1143
    if (pResult->info.rows == 0) {
      continue;
    }

1144 1145 1146 1147 1148 1149 1150 1151
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1152 1153
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1154 1155 1156 1157 1158
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1159 1160 1161

      blockDataDestroy(tmpBlock);

1162 1163 1164 1165 1166
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
    } else if (pResult->info.groupId == pInfo->groupId) {
5
54liuyao 已提交
1167
      pResult->info.calWin = pInfo->updateWin;
1168
      return pResult;
5
54liuyao 已提交
1169 1170
    }
  }
1171
}
1172

1173 1174 1175
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1176
  }
1177 1178
  blockDataCleanup(pDestBlock);
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1179
  if (code != TSDB_CODE_SUCCESS) {
1180
    return code;
L
Liu Jicong 已提交
1181
  }
1182 1183
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1184
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1185
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1186
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1187 1188
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1189

1190 1191
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1192
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1193
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1194 1195
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1196
  int64_t          version = pSrcBlock->info.version - 1;
1197
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1198
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1199
    // gap must be 0.
5
54liuyao 已提交
1200 1201 1202
    SSessionKey startWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin);
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1203 1204 1205
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1206 1207 1208 1209 1210 1211
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
    colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);

5
54liuyao 已提交
1212
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1213
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1214 1215
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1216
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1217
  }
1218
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1219
}
1220 1221 1222 1223 1224 1225

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1226
  }
1227
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows * 2);
1228 1229 1230 1231
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1232 1233
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1234 1235 1236 1237
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
1238 1239 1240
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
1241 1242
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1243
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1244 1245 1246
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1247
  int64_t          version = pSrcBlock->info.version - 1;
1248
  for (int32_t i = 0; i < rows;) {
1249
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1250 1251 1252 1253 1254
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1255
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1256
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1257 1258
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1259 1260
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1261 1262 1263 1264
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1265
  }
1266 1267
  return TSDB_CODE_SUCCESS;
}
1268

1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
  }
  blockDataCleanup(pDestBlock);
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;

  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  int32_t          dummy = 0;
  int64_t          version = pSrcBlock->info.version - 1;
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
    colDataAppend(pDestStartCol, i, (const char*)(startData + i), false);
    colDataAppend(pDestEndCol, i, (const char*)(endData + i), false);
    colDataAppendNULL(pDestUidCol, i);
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
    pDestBlock->info.rows++;
  }
  return TSDB_CODE_SUCCESS;
}

1307 1308 1309 1310
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1311
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1312
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
1313
  }
1314
  pDestBlock->info.type = STREAM_CLEAR;
1315
  pDestBlock->info.version = pSrcBlock->info.version;
1316 1317 1318 1319
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  blockDataEnsureCapacity(pResBlock, 1);

H
Haojun Liao 已提交
1329
  projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
L
Liu Jicong 已提交
1330 1331 1332
  ASSERT(pResBlock->info.rows == 1);

  // build tagArray
1333 1334 1335 1336 1337
  /*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
  /*STagVal tagVal = {*/
  /*.cid = 0,*/
  /*.type = 0,*/
  /*};*/
L
Liu Jicong 已提交
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
  // build STag
  // set STag

  blockDataDestroy(pSrcBlock);
}

static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
L
Liu Jicong 已提交
1366 1367
    memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN));
    pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0;
L
Liu Jicong 已提交
1368 1369 1370 1371 1372 1373 1374 1375
  } else {
    pBlock->info.parTbName[0] = 0;
  }

  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1376 1377
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1378 1379
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1380 1381
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1382 1383
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1384
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1385 1386
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1387 1388
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1389 1390
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1391
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1392
  pBlock->info.rows++;
5
54liuyao 已提交
1393 1394
}

1395
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1396 1397
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1398
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1399
  }
1400 1401
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1402
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
L
Liu Jicong 已提交
1403
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
1404
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1405 1406
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1407
    bool        isClosed = false;
5
54liuyao 已提交
1408
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1409
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1410 1411 1412
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1413 1414
    // must check update info first.
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1415
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1416 1417
                     isDeletedStreamWindow(&win, pBlock->info.groupId,
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1418
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1419
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1420
      uint64_t gpId = 0;
1421 1422
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                       NULL);
5
54liuyao 已提交
1423 1424
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
1425 1426
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                         NULL);
5
54liuyao 已提交
1427
      }
1428 1429
    }
  }
1430 1431
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1432
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1433
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1434 1435
  }
}
L
Liu Jicong 已提交
1436

1437
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1438 1439
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1440
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1441

1442 1443
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1444 1445 1446
  pInfo->pRes->info.rows = pBlock->info.rows;
  pInfo->pRes->info.uid = pBlock->info.uid;
  pInfo->pRes->info.type = STREAM_NORMAL;
1447
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1448

L
Liu Jicong 已提交
1449
  uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
L
Liu Jicong 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
  if (groupIdPre) {
    pInfo->pRes->info.groupId = *groupIdPre;
  } else {
    pInfo->pRes->info.groupId = 0;
  }

  // todo extract method
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
    SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
    if (!pColMatchInfo->output) {
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
1467 1468
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1483 1484
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
                                          GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1485
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1486
      blockDataFreeRes((SSDataBlock*)pBlock);
1487
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1488
    }
L
Liu Jicong 已提交
1489 1490
  }

1491
  if (filter) {
D
dapan1121 已提交
1492
    doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
1493
  }
L
Liu Jicong 已提交
1494
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1495
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1496

L
Liu Jicong 已提交
1497
  calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
L
Liu Jicong 已提交
1498

L
Liu Jicong 已提交
1499 1500
  return 0;
}
5
54liuyao 已提交
1501

L
Liu Jicong 已提交
1502
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1503 1504
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1505

L
Liu Jicong 已提交
1506
  qDebug("queue scan called");
L
Liu Jicong 已提交
1507 1508 1509 1510 1511 1512 1513 1514

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1515
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1532
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1533 1534 1535 1536 1537 1538 1539 1540

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1541
    return NULL;
L
Liu Jicong 已提交
1542 1543
  }

L
Liu Jicong 已提交
1544 1545 1546
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1547
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1548
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1549 1550
      return pResult;
    } else {
1551 1552 1553 1554 1555
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
        tsdbReaderClose(pTSInfo->dataReader);
        pTSInfo->dataReader = NULL;
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1556
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1557 1558 1559 1560 1561
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1562 1563
        return NULL;
      }
1564 1565 1566
    }
  }

L
Liu Jicong 已提交
1567 1568 1569 1570 1571 1572
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1573
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1574 1575 1576
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1577
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1578
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1579 1580 1581 1582
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1583 1584 1585
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1586 1587
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1588
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1589 1590 1591 1592
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1593
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1594
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1595 1596 1597
        return NULL;
      }
    }
L
Liu Jicong 已提交
1598
#if 0
L
Liu Jicong 已提交
1599 1600
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1601 1602 1603 1604 1605 1606
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1607
#endif
L
Liu Jicong 已提交
1608 1609 1610
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1611
  }
L
Liu Jicong 已提交
1612 1613
}

L
Liu Jicong 已提交
1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
  pDst->info = pSrc->info;
  pDst->info.rows = j;

  return 0;
}

5
54liuyao 已提交
1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  } else {
    // SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
    // if (!pPreRes || pPreRes->info.rows == 0) {
    //   return 0;
    // }
    // ASSERT(pPreRes->info.rows == 1);
    // return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
  }
}

L
Liu Jicong 已提交
1672 1673 1674 1675 1676
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1677
  qDebug("stream scan called");
L
Liu Jicong 已提交
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1711
  }
L
Liu Jicong 已提交
1712
#endif
H
Haojun Liao 已提交
1713

L
Liu Jicong 已提交
1714
#if 1
L
Liu Jicong 已提交
1715 1716 1717
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
1718 1719
    pTSInfo->cond.startVersion = -1;
    pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
L
Liu Jicong 已提交
1720 1721 1722 1723 1724 1725 1726 1727
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
L
Liu Jicong 已提交
1728 1729
      calBlockTbName(&pInfo->tbnameCalSup, pBlock);
      updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1730 1731 1732 1733 1734
      return pBlock;
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
    return NULL;
  }
L
Liu Jicong 已提交
1735
#endif
L
Liu Jicong 已提交
1736

5
54liuyao 已提交
1737
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
1738
  // TODO: refactor
L
Liu Jicong 已提交
1739
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1740
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1741
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1742
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1743
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1744 1745 1746
      return NULL;
    }

1747
    int32_t      current = pInfo->validBlockIndex++;
1748
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
1749
    // TODO move into scan
5
54liuyao 已提交
1750 1751
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1752
    blockDataUpdateTsWindow(pBlock, 0);
1753
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1754 1755 1756
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1757 1758 1759
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1760 1761
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1762 1763 1764
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1765
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1766
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1767
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1768
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1769
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1770 1771
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1772
        }
5
54liuyao 已提交
1773 1774
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
1775
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1776
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1777
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1778
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1779 1780
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1781 1782 1783 1784 1785
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1786 1787 1788
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1789
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1790 1791 1792 1793
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
          pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1794 1795 1796 1797
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1798 1799 1800 1801 1802
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1803
        }
1804 1805 1806
      } break;
      default:
        break;
5
54liuyao 已提交
1807
    }
1808
    // printDataBlock(pBlock, "stream scan recv");
1809
    return pBlock;
L
Liu Jicong 已提交
1810
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1811
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1812 1813 1814 1815 1816 1817
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1818
      case STREAM_SCAN_FROM_DELETE_DATA: {
1819 1820 1821 1822 1823 1824 1825
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1836
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
1837 1838
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1839 1840
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1841
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1842
          calBlockTbName(&pInfo->tbnameCalSup, pSDB);
5
54liuyao 已提交
1843 1844
          return pSDB;
        }
1845
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1846 1847 1848 1849
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1850
    }
1851

1852
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1853
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1854 1855
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1856 1857
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1858 1859
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1860
    }
5
54liuyao 已提交
1861

H
Haojun Liao 已提交
1862 1863
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1864
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1865

L
Liu Jicong 已提交
1866
  NEXT_SUBMIT_BLK:
1867 1868 1869
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1870
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1871
          doClearBufferedBlocks(pInfo);
1872 1873
          return NULL;
        }
1874

1875 1876 1877 1878 1879 1880 1881 1882
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1883 1884
      }

1885 1886 1887 1888
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1889

1890 1891 1892 1893 1894 1895
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1896
        setBlockIntoRes(pInfo, &block, false);
1897

L
Liu Jicong 已提交
1898 1899
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
                             pInfo->pRes->info.version)) {
1900 1901 1902 1903 1904
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1921
        doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
1922 1923 1924
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1925 1926 1927
          break;
        }
      }
1928
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1929
        break;
J
jiacy-jcy 已提交
1930 1931
      } else {
        pInfo->tqReader->pMsg = NULL;
1932
        continue;
5
54liuyao 已提交
1933
      }
1934
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1935 1936 1937 1938
    }

    // record the scan action.
    pInfo->numOfExec++;
1939
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
1940
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
1941

L
Liu Jicong 已提交
1942
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
1943 1944 1945
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
1946 1947 1948 1949 1950 1951

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
1952 1953 1954
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1955 1956 1957
  }
}

1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
  for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

1970
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1971 1972
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
1973
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1974
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
1975
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
1976

wmmhello's avatar
wmmhello 已提交
1977
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
1978
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
wmmhello's avatar
wmmhello 已提交
1979
    SSDataBlock* pBlock = &pInfo->pRes;
1980

1981
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
1982 1983 1984
      if (isTaskKilled(pTaskInfo)) {
        longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
      }
1985

wmmhello's avatar
wmmhello 已提交
1986
      tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info);
1987

wmmhello's avatar
wmmhello 已提交
1988 1989 1990
      SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      pBlock->pDataBlock = pCols;
      if (pCols == NULL) {
wmmhello's avatar
wmmhello 已提交
1991
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
1992 1993
      }

1994
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.uid);
wmmhello's avatar
wmmhello 已提交
1995 1996 1997 1998 1999
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2000 2001

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2002
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2003 2004
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2005 2006
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2007
    } else {
wmmhello's avatar
wmmhello 已提交
2008 2009
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2010
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2011 2012
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2013
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2014
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2015
    return NULL;
L
Liu Jicong 已提交
2016 2017 2018 2019 2020 2021 2022
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2023
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2024
      taosMemoryFreeClear(data);
2025 2026 2027
      return NULL;
    }

L
Liu Jicong 已提交
2028
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2029 2030 2031 2032
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2033
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2034
    } else {
wmmhello's avatar
wmmhello 已提交
2035 2036 2037 2038 2039 2040 2041
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2042

wmmhello's avatar
wmmhello 已提交
2043
    return NULL;
2044
  }
L
Liu Jicong 已提交
2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2083 2084 2085
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2086
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2087 2088 2089 2090 2091 2092
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2093 2094 2095
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2096
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2097 2098 2099 2100 2101
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2102 2103
  int32_t code = TSDB_CODE_SUCCESS;

2104
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2105
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2106
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2107 2108
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2109 2110
  }

wmmhello's avatar
wmmhello 已提交
2111 2112
  pInfo->vnode = pHandle->vnode;

2113 2114 2115 2116 2117
  pInfo->sContext = pHandle->sContext;
  pOperator->name = "RawStreamScanOperator";
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;

L
Liu Jicong 已提交
2118
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
2119
  return pOperator;
H
Haojun Liao 已提交
2120

L
Liu Jicong 已提交
2121
_end:
H
Haojun Liao 已提交
2122 2123 2124 2125
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2126 2127
}

2128
static void destroyStreamScanOperatorInfo(void* param) {
2129 2130 2131
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
    STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
2132
    destroyTableScanOperatorInfo(pTableScanInfo);
5
54liuyao 已提交
2133
    taosMemoryFreeClear(pStreamScan->pTableScanOp);
2134 2135 2136 2137 2138 2139 2140
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
  if (pStreamScan->pColMatchInfo) {
    taosArrayDestroy(pStreamScan->pColMatchInfo);
  }
C
Cary Xu 已提交
2141 2142
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2143
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2144
  }
C
Cary Xu 已提交
2145

L
Liu Jicong 已提交
2146
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2147 2148 2149 2150
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2151
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2152 2153 2154 2155
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2156
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2157
                                            SExecTaskInfo* pTaskInfo) {
2158 2159
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2160

H
Haojun Liao 已提交
2161 2162
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
2163
    goto _error;
H
Haojun Liao 已提交
2164 2165
  }

2166
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2167
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2168

2169
  pInfo->pTagCond = pTagCond;
2170
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2171

2172
  int32_t numOfCols = 0;
2173
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
2174 2175 2176

  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2177
  for (int32_t i = 0; i < numOfOutput; ++i) {
2178 2179 2180
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
2181
    taosArrayPush(pColIds, &colId);
2182
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
5
54liuyao 已提交
2183 2184
      pInfo->primaryTsIndex = id->targetSlotId;
    }
H
Haojun Liao 已提交
2185 2186
  }

L
Liu Jicong 已提交
2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

H
Haojun Liao 已提交
2213 2214
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2215 2216
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2217 2218
  }

5
54liuyao 已提交
2219
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2220
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2221
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2222
    if (pHandle->version > 0) {
L
Liu Jicong 已提交
2223
      pTSInfo->cond.endVersion = pHandle->version;
2224
    }
L
Liu Jicong 已提交
2225 2226

    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
2227
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2228 2229 2230
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
      pTSInfo->dataReader = NULL;
      if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
L
Liu Jicong 已提交
2231 2232
        ASSERT(0);
      }
L
Liu Jicong 已提交
2233 2234
    }

L
Liu Jicong 已提交
2235 2236 2237 2238
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2239
    } else {
L
Liu Jicong 已提交
2240 2241
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2242 2243
    }

2244
    pInfo->pUpdateInfo = NULL;
2245
    pInfo->pTableScanOp = pTableScanOp;
2246 2247 2248
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2249

L
Liu Jicong 已提交
2250 2251
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2252
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2253

L
Liu Jicong 已提交
2254
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2255
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
L
Liu Jicong 已提交
2256
    SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
L
Liu Jicong 已提交
2257
    int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2258 2259 2260 2261 2262
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
L
Liu Jicong 已提交
2263
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2264 2265
  } else {
    taosArrayDestroy(pColIds);
5
54liuyao 已提交
2266 2267
  }

2268 2269 2270 2271 2272
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

2273
  pInfo->pRes = createResDataBlock(pDescNode);
2274
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2275 2276
  pInfo->pCondition = pScanPhyNode->node.pConditions;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2277
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2278
  pInfo->groupId = 0;
2279
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2280
  pInfo->pStreamScanOp = pOperator;
2281
  pInfo->deleteDataIndex = 0;
2282
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2283
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2284
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2285
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2286
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2287

2288
  pOperator->name = "StreamScanOperator";
L
Liu Jicong 已提交
2289
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
2290 2291 2292
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2293
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
2294
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2295

L
Liu Jicong 已提交
2296 2297 2298
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
2299

H
Haojun Liao 已提交
2300
  return pOperator;
2301

L
Liu Jicong 已提交
2302
_error:
H
Haojun Liao 已提交
2303 2304 2305 2306 2307 2308 2309 2310
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2311 2312
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2313 2314
}

2315
static void destroySysScanOperator(void* param) {
H
Haojun Liao 已提交
2316 2317 2318 2319
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

2320
  const char* name = tNameGetTableName(&pInfo->name);
D
dapan1121 已提交
2321 2322
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
      strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
2323
    metaCloseTbCursor(pInfo->pCur);
2324
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
2325
  }
H
Haojun Liao 已提交
2326 2327

  taosArrayDestroy(pInfo->scanCols);
2328
  taosMemoryFreeClear(pInfo->pUser);
D
dapan1121 已提交
2329 2330

  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2331 2332
}

X
Xiaoyu Wang 已提交
2333
static int32_t getSysTableDbNameColId(const char* pTable) {
D
dapan1121 已提交
2334
  // if (0 == strcmp(TSDB_INS_TABLE_INDEXES, pTable)) {
X
Xiaoyu Wang 已提交
2335 2336
  //   return 1;
  // }
X
Xiaoyu Wang 已提交
2337 2338 2339
  return TSDB_INS_USER_STABLES_DBNAME_COLID;
}

H
Haojun Liao 已提交
2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
2361
      if (getSysTableDbNameColId(node->tableName) == node->colId) {
H
Haojun Liao 已提交
2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
2378
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
2379 2380 2381 2382 2383 2384 2385
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

2386
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
2387 2388 2389
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
2390
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
2391 2392
}

D
dapan1121 已提交
2393
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
2394 2395 2396 2397 2398 2399 2400
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
2401 2402 2403
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
2404 2405 2406 2407 2408
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
2409
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2410 2411 2412 2413 2414 2415 2416
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

H
Haojun Liao 已提交
2417
  doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
H
Haojun Liao 已提交
2418 2419 2420
  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

2421
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
L
Liu Jicong 已提交
2422 2423
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
2424 2425 2426
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
2427
  for (int32_t i = 0; i < size; ++i) {
2428
    if (strcmp(pMeta[i].name, tableName) == 0) {
2429 2430 2431 2432
      index = i;
      break;
    }
  }
2433

2434
  SSDataBlock* pBlock = createDataBlock();
L
Liu Jicong 已提交
2435
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
L
Liu Jicong 已提交
2436 2437
    SColumnInfoData colInfoData =
        createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
2438
    blockDataAppendColInfo(pBlock, &colInfoData);
2439 2440
  }

2441 2442 2443
  return pBlock;
}

2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524
int32_t convertTagDataToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len) {
  int32_t n = 0;

  switch (type) {
    case TSDB_DATA_TYPE_NULL:
      n = sprintf(str, "null");
      break;

    case TSDB_DATA_TYPE_BOOL:
      n = sprintf(str, (*(int8_t*)buf) ? "true" : "false");
      break;

    case TSDB_DATA_TYPE_TINYINT:
      n = sprintf(str, "%d", *(int8_t*)buf);
      break;

    case TSDB_DATA_TYPE_SMALLINT:
      n = sprintf(str, "%d", *(int16_t*)buf);
      break;

    case TSDB_DATA_TYPE_INT:
      n = sprintf(str, "%d", *(int32_t*)buf);
      break;

    case TSDB_DATA_TYPE_BIGINT:
    case TSDB_DATA_TYPE_TIMESTAMP:
      n = sprintf(str, "%" PRId64, *(int64_t*)buf);
      break;

    case TSDB_DATA_TYPE_FLOAT:
      n = sprintf(str, "%.5f", GET_FLOAT_VAL(buf));
      break;

    case TSDB_DATA_TYPE_DOUBLE:
      n = sprintf(str, "%.9f", GET_DOUBLE_VAL(buf));
      break;

    case TSDB_DATA_TYPE_BINARY:
      if (bufSize < 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

      memcpy(str, buf, bufSize);
      n = bufSize;
      break;
    case TSDB_DATA_TYPE_NCHAR:
      if (bufSize < 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str);
      if (length <= 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
      n = length;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      n = sprintf(str, "%u", *(uint8_t*)buf);
      break;

    case TSDB_DATA_TYPE_USMALLINT:
      n = sprintf(str, "%u", *(uint16_t*)buf);
      break;

    case TSDB_DATA_TYPE_UINT:
      n = sprintf(str, "%u", *(uint32_t*)buf);
      break;

    case TSDB_DATA_TYPE_UBIGINT:
      n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
      break;

    default:
      return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (len) *len = n;

  return TSDB_CODE_SUCCESS;
}

2525 2526 2527 2528 2529 2530 2531
static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) {
  SOperatorNode* node = (SOperatorNode*)pCond;
  if (node->opType == OP_TYPE_EQUAL) {
    if (nodeType(node->pLeft) == QUERY_NODE_COLUMN &&
        strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 &&
        nodeType(node->pRight) == QUERY_NODE_VALUE) {
      SValueNode* pValue = (SValueNode*)node->pRight;
2532 2533 2534 2535
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NCHAR || pValue->node.resType.type == TSDB_DATA_TYPE_VARCHAR ||
          pValue->node.resType.type == TSDB_DATA_TYPE_BINARY) {
        char* value = nodesGetValueFromNode(pValue);
        strncpy(condTable, varDataVal(value), TSDB_TABLE_NAME_LEN);
2536 2537 2538 2539 2540 2541 2542 2543
        return true;
      }
    }
  }
  return false;
}

static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
S
slzhou 已提交
2544 2545 2546
  if (pCond == NULL) {
    return false;
  }
2547 2548 2549
  if (nodeType(pCond) == QUERY_NODE_LOGIC_CONDITION) {
    SLogicConditionNode* node = (SLogicConditionNode*)pCond;
    if (LOGIC_COND_TYPE_AND == node->condType) {
S
slzhou 已提交
2550 2551 2552 2553
      SNode* pChild = NULL;
      FOREACH(pChild, node->pParameterList) {
        if (QUERY_NODE_OPERATOR == nodeType(pChild) && sysTableIsOperatorCondOnOneTable(pChild, condTable)) {
          return true;
2554 2555 2556 2557
        }
      }
    }
  }
S
slzhou 已提交
2558

2559 2560 2561
  if (QUERY_NODE_OPERATOR == nodeType(pCond)) {
    return sysTableIsOperatorCondOnOneTable(pCond, condTable);
  }
S
slzhou 已提交
2562

2563 2564 2565
  return false;
}

S
shenglian zhou 已提交
2566 2567 2568 2569 2570 2571 2572 2573 2574 2575
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  blockDataCleanup(pInfo->pRes);
  int32_t numOfRows = 0;

2576 2577 2578
  SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
  blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);

S
shenglian zhou 已提交
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589
  const char* db = NULL;
  int32_t     vgId = 0;
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);

  SName sn = {0};
  char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);

  tNameGetDbName(&sn, varDataVal(dbname));
  varDataSetLen(dbname, strlen(varDataVal(dbname)));

2590
  char condTableName[TSDB_TABLE_NAME_LEN] = {0};
S
slzhou 已提交
2591 2592
  // optimize when sql like where table_name='tablename' and xxx.
  if (pInfo->pCondition && sysTableIsCondOnOneTable(pInfo->pCondition, condTableName)) {
2593 2594 2595
    char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(tableName, condTableName);

2596 2597
    SMetaReader smrChildTable = {0};
    metaReaderInit(&smrChildTable, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2598 2599 2600 2601 2602 2603
    int32_t code = metaGetTableEntryByName(&smrChildTable, condTableName);
    if (code != TSDB_CODE_SUCCESS) {
      // terrno has been set by metaGetTableEntryByName, therefore, return directly
      return NULL;
    }

2604 2605 2606 2607 2608 2609
    if (smrChildTable.me.type != TSDB_CHILD_TABLE) {
      metaReaderClear(&smrChildTable);
      blockDataDestroy(dataBlock);
      pInfo->loadInfo.totalRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
2610

2611
    SMetaReader smrSuperTable = {0};
2612
    metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, META_READER_NOLOCK);
H
Haojun Liao 已提交
2613 2614 2615 2616 2617 2618
    code = metaGetTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
    if (code != TSDB_CODE_SUCCESS) {
      // terrno has been set by metaGetTableEntryByUid
      return NULL;
    }

2619 2620 2621
    sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows, dataBlock);
    metaReaderClear(&smrSuperTable);
    metaReaderClear(&smrChildTable);
2622 2623 2624 2625 2626 2627
    if (numOfRows > 0) {
      relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
      numOfRows = 0;
    }
    blockDataDestroy(dataBlock);
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
2628
    doSetOperatorCompleted(pOperator);
2629 2630
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  }
S
shenglian zhou 已提交
2631 2632

  int32_t ret = 0;
2633 2634 2635 2636
  if (pInfo->pCur == NULL) {
    pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
  }

S
shenglian zhou 已提交
2637
  while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
2638 2639 2640
    if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
      continue;
    }
S
shenglian zhou 已提交
2641

2642 2643
    char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
S
shenglian zhou 已提交
2644

2645 2646
    SMetaReader smrSuperTable = {0};
    metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
2647
    uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
2648
    int32_t  code = metaGetTableEntryByUid(&smrSuperTable, suid);
2649 2650 2651
    if (code != TSDB_CODE_SUCCESS) {
      qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
2652
      metaReaderClear(&smrSuperTable);
2653 2654
      metaCloseTbCursor(pInfo->pCur);
      pInfo->pCur = NULL;
2655
      T_LONG_JMP(pTaskInfo->env, terrno);
2656
    }
S
shenglian zhou 已提交
2657

2658
    sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
2659

2660
    metaReaderClear(&smrSuperTable);
S
shenglian zhou 已提交
2661

2662
    if (numOfRows >= pOperator->resultInfo.capacity) {
2663
      relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
2664 2665 2666 2667 2668
      numOfRows = 0;

      if (pInfo->pRes->info.rows > 0) {
        break;
      }
S
shenglian zhou 已提交
2669 2670 2671
    }
  }

2672
  if (numOfRows > 0) {
2673
    relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
2674 2675 2676
    numOfRows = 0;
  }

2677
  blockDataDestroy(dataBlock);
S
shenglian zhou 已提交
2678 2679 2680 2681 2682 2683 2684 2685 2686 2687
  if (ret != 0) {
    metaCloseTbCursor(pInfo->pCur);
    pInfo->pCur = NULL;
    doSetOperatorCompleted(pOperator);
  }

  pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
  return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}

2688 2689 2690 2691 2692 2693 2694 2695 2696 2697
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) {
  dataBlock->info.rows = numOfRows;
  pInfo->pRes->info.rows = numOfRows;

  relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false);
  doFilterResult(pInfo);

  blockDataCleanup(dataBlock);
}

2698 2699 2700
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
                                                SMetaReader* smrChildTable, const char* dbname, const char* tableName,
                                                int32_t* pNumOfRows, const SSDataBlock* dataBlock) {
2701
  char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2702
  STR_TO_VARSTR(stableName, (*smrSuperTable).me.name);
2703 2704 2705

  int32_t numOfRows = *pNumOfRows;

2706
  int32_t numOfTags = (*smrSuperTable).me.stbEntry.schemaTag.nCols;
2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723
  for (int32_t i = 0; i < numOfTags; ++i) {
    SColumnInfoData* pColInfoData = NULL;

    // table name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
    colDataAppend(pColInfoData, numOfRows, tableName, false);

    // database name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, dbname, false);

    // super table name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
    colDataAppend(pColInfoData, numOfRows, stableName, false);

    // tag name
    char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2724
    STR_TO_VARSTR(tagName, (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].name);
2725 2726 2727 2728
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, tagName, false);

    // tag type
2729
    int8_t tagType = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].type;
2730 2731 2732 2733 2734
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
    char tagTypeStr[VARSTR_HEADER_SIZE + 32];
    int  tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
    if (tagType == TSDB_DATA_TYPE_VARCHAR) {
      tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
2735
                            (int32_t)((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
2736
    } else if (tagType == TSDB_DATA_TYPE_NCHAR) {
2737 2738 2739
      tagTypeLen += sprintf(
          varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
          (int32_t)(((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
2740 2741 2742 2743 2744
    }
    varDataSetLen(tagTypeStr, tagTypeLen);
    colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);

    STagVal tagVal = {0};
2745
    tagVal.cid = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].colId;
2746 2747 2748 2749
    char*    tagData = NULL;
    uint32_t tagLen = 0;

    if (tagType == TSDB_DATA_TYPE_JSON) {
2750
      tagData = (char*)smrChildTable->me.ctbEntry.pTags;
2751
    } else {
2752
      bool exist = tTagGet((STag*)smrChildTable->me.ctbEntry.pTags, &tagVal);
2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792
      if (exist) {
        if (IS_VAR_DATA_TYPE(tagType)) {
          tagData = (char*)tagVal.pData;
          tagLen = tagVal.nData;
        } else {
          tagData = (char*)&tagVal.i64;
          tagLen = tDataTypes[tagType].bytes;
        }
      }
    }

    char* tagVarChar = NULL;
    if (tagData != NULL) {
      if (tagType == TSDB_DATA_TYPE_JSON) {
        char* tagJson = parseTagDatatoJson(tagData);
        tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
        memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
        varDataSetLen(tagVarChar, strlen(tagJson));
        taosMemoryFree(tagJson);
      } else {
        int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
                                                    : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
        tagVarChar = taosMemoryMalloc(bufSize);
        int32_t len = -1;
        convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
        varDataSetLen(tagVarChar, len);
      }
    }
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
    colDataAppend(pColInfoData, numOfRows, tagVarChar,
                  (tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
    taosMemoryFree(tagVarChar);
    ++numOfRows;
  }

  *pNumOfRows = numOfRows;

  return TSDB_CODE_SUCCESS;
}

2793
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2794 2795
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
2796 2797 2798
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
H
Haojun Liao 已提交
2799

2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811
  // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
  if (pInfo->readHandle.mnd != NULL) {
    buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);

    doFilterResult(pInfo);
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;

    doSetOperatorCompleted(pOperator);
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  } else {
    if (pInfo->pCur == NULL) {
      pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
2812 2813
    }

2814 2815
    blockDataCleanup(pInfo->pRes);
    int32_t numOfRows = 0;
2816

2817 2818 2819
    const char* db = NULL;
    int32_t     vgId = 0;
    vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
2820

2821 2822 2823
    SName sn = {0};
    char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
2824

2825 2826
    tNameGetDbName(&sn, varDataVal(dbname));
    varDataSetLen(dbname, strlen(varDataVal(dbname)));
2827

D
dapan1121 已提交
2828
    SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
2829
    blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
2830

2831
    char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2832

2833 2834 2835
    int32_t ret = 0;
    while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
      STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
2836

2837 2838 2839
      // table name
      SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
      colDataAppend(pColInfoData, numOfRows, n, false);
2840

2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856
      // database name
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
      colDataAppend(pColInfoData, numOfRows, dbname, false);

      // vgId
      pColInfoData = taosArrayGet(p->pDataBlock, 6);
      colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);

      int32_t tableType = pInfo->pCur->mr.me.type;
      if (tableType == TSDB_CHILD_TABLE) {
        // create time
        int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
        pColInfoData = taosArrayGet(p->pDataBlock, 2);
        colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

        SMetaReader mr = {0};
2857
        metaReaderInit(&mr, pInfo->readHandle.meta, META_READER_NOLOCK);
2858 2859 2860 2861

        uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
        int32_t  code = metaGetTableEntryByUid(&mr, suid);
        if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
2862 2863
          qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pInfo->pCur->mr.me.name,
                 suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
2864 2865 2866
          metaReaderClear(&mr);
          metaCloseTbCursor(pInfo->pCur);
          pInfo->pCur = NULL;
2867
          T_LONG_JMP(pTaskInfo->env, terrno);
2868
        }
2869

2870 2871 2872
        // number of columns
        pColInfoData = taosArrayGet(p->pDataBlock, 3);
        colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
2873

2874 2875 2876
        // super table name
        STR_TO_VARSTR(n, mr.me.name);
        pColInfoData = taosArrayGet(p->pDataBlock, 4);
2877
        colDataAppend(pColInfoData, numOfRows, n, false);
2878
        metaReaderClear(&mr);
2879

2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890
        // table comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
        if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
          char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
          char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, "");
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else {
2891 2892
          colDataAppendNULL(pColInfoData, numOfRows);
        }
2893

2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
        // uid
        pColInfoData = taosArrayGet(p->pDataBlock, 5);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

        // ttl
        pColInfoData = taosArrayGet(p->pDataBlock, 7);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

        STR_TO_VARSTR(n, "CHILD_TABLE");
      } else if (tableType == TSDB_NORMAL_TABLE) {
        // create time
        pColInfoData = taosArrayGet(p->pDataBlock, 2);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

        // number of columns
        pColInfoData = taosArrayGet(p->pDataBlock, 3);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);

        // super table name
        pColInfoData = taosArrayGet(p->pDataBlock, 4);
        colDataAppendNULL(pColInfoData, numOfRows);
2915

2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927
        // table comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
        if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
          char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
          char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, "");
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else {
          colDataAppendNULL(pColInfoData, numOfRows);
2928
        }
2929 2930 2931 2932 2933 2934 2935 2936 2937 2938

        // uid
        pColInfoData = taosArrayGet(p->pDataBlock, 5);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

        // ttl
        pColInfoData = taosArrayGet(p->pDataBlock, 7);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

        STR_TO_VARSTR(n, "NORMAL_TABLE");
H
Haojun Liao 已提交
2939 2940
      }

2941 2942 2943 2944
      pColInfoData = taosArrayGet(p->pDataBlock, 9);
      colDataAppend(pColInfoData, numOfRows, n, false);

      if (++numOfRows >= pOperator->resultInfo.capacity) {
2945 2946 2947 2948 2949 2950 2951
        p->info.rows = numOfRows;
        pInfo->pRes->info.rows = numOfRows;

        relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
        doFilterResult(pInfo);

        blockDataCleanup(p);
2952 2953
        numOfRows = 0;

2954
        if (pInfo->pRes->info.rows > 0) {
2955
          break;
2956
        }
2957
      }
2958
    }
2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970

    if (numOfRows > 0) {
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;

      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
      doFilterResult(pInfo);

      blockDataCleanup(p);
      numOfRows = 0;
    }

2971
    blockDataDestroy(p);
2972

2973 2974 2975 2976 2977 2978
    // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
    if (ret != 0) {
      metaCloseTbCursor(pInfo->pCur);
      pInfo->pCur = NULL;
      doSetOperatorCompleted(pOperator);
    }
H
Haojun Liao 已提交
2979

2980 2981 2982 2983 2984
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  }
}

2985 2986 2987 2988 2989 2990 2991 2992
static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) {
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  pInfo->pRes->info.rows = 0;
D
dapan1121 已提交
2993
  pOperator->status = OP_EXEC_DONE;
2994 2995 2996 2997 2998

  pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
  return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}

2999 3000 3001 3002
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
3003
  char               dbName[TSDB_DB_NAME_LEN] = {0};
3004 3005

  const char* name = tNameGetTableName(&pInfo->name);
D
dapan1121 已提交
3006 3007 3008 3009
  if (pInfo->showRewrite) {
    getDBNameFromCondition(pInfo->pCondition, dbName);
    sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
  }
H
Hongze Cheng 已提交
3010

D
dapan1121 已提交
3011
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
3012
    return sysTableScanUserTables(pOperator);
D
dapan1121 已提交
3013
  } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
3014
    return sysTableScanUserTags(pOperator);
L
Liu Jicong 已提交
3015 3016
  } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
             IS_SYS_DBNAME(dbName)) {
3017
    return sysTableScanUserSTables(pOperator);
H
Haojun Liao 已提交
3018 3019 3020 3021 3022
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

3023 3024
    while (1) {
      int64_t startTs = taosGetTimestampUs();
H
Haojun Liao 已提交
3025
      tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
H
Haojun Liao 已提交
3026
      tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
H
Haojun Liao 已提交
3027

3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
3039

L
Liu Jicong 已提交
3040 3041
      int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
                                                                                : TDMT_MND_SYSTABLE_RETRIEVE;
D
dapan1121 已提交
3042

3043 3044 3045
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
D
dapan1121 已提交
3046
      pMsgSendInfo->msgType = msgType;
3047
      pMsgSendInfo->fp = loadSysTableCallback;
D
dapan1121 已提交
3048
      pMsgSendInfo->requestId = pTaskInfo->id.queryId;
H
Haojun Liao 已提交
3049

3050
      int64_t transporterId = 0;
3051 3052
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
3053
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
3054

3055 3056 3057 3058 3059
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
3060

3061 3062
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
3063

3064 3065
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
3066
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
3067
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
3068

3069
        if (pRsp->numOfRows == 0) {
H
Haojun Liao 已提交
3070
          taosMemoryFree(pRsp);
3071 3072 3073
          return NULL;
        }
      }
H
Haojun Liao 已提交
3074

3075
      char* pStart = pRsp->data;
H
Haojun Liao 已提交
3076
      extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->scanCols, &pStart);
3077
      updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
H
Haojun Liao 已提交
3078

3079 3080
      // todo log the filter info
      doFilterResult(pInfo);
H
Haojun Liao 已提交
3081
      taosMemoryFree(pRsp);
3082 3083
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
D
dapan1121 已提交
3084 3085
      } else if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
3086
      }
3087
    }
H
Haojun Liao 已提交
3088 3089 3090
  }
}

3091
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
D
dapan1121 已提交
3092
  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
3093
  blockDataEnsureCapacity(p, capacity);
3094

L
Liu Jicong 已提交
3095
  size_t               size = 0;
3096 3097 3098
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
3099
  p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3100 3101

  getPerfDbMeta(&pSysDbTableMeta, &size);
3102
  p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3103 3104

  pInfo->pRes->info.rows = p->info.rows;
3105
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
3106 3107 3108
  blockDataDestroy(p);

  return pInfo->pRes->info.rows;
3109 3110
}

3111
int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
L
Liu Jicong 已提交
3112 3113
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
3114 3115
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
3116
  for (int32_t i = 0; i < size; ++i) {
3117
    const SSysTableMeta* pm = &pSysDbTableMeta[i];
3118 3119 3120
    if (!sysInfo && pm->sysInfo) {
      continue;
    }
3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
3140
    for (int32_t j = 4; j <= 8; ++j) {
3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

3156
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
3157
                                              const char* pUser, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3158 3159 3160
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
3161
    goto _error;
H
Haojun Liao 已提交
3162 3163
  }

3164 3165 3166
  SScanPhysiNode* pScanNode = &pScanPhyNode->scan;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
3167
  SSDataBlock*        pResBlock = createResDataBlock(pDescNode);
3168 3169

  int32_t num = 0;
3170
  SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
3171

3172 3173
  pInfo->accountId = pScanPhyNode->accountId;
  pInfo->pUser = taosMemoryStrDup((void*)pUser);
3174
  pInfo->sysInfo = pScanPhyNode->sysInfo;
3175
  pInfo->showRewrite = pScanPhyNode->showRewrite;
3176 3177 3178
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pScanNode->node.pConditions;
  pInfo->scanCols = colList;
3179

3180
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3181

3182
  tNameAssign(&pInfo->name, &pScanNode->tableName);
3183
  const char* name = tNameGetTableName(&pInfo->name);
3184

D
dapan1121 已提交
3185 3186
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
      strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
3187
    pInfo->readHandle = *(SReadHandle*)readHandle;
3188
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
3189 3190
  } else {
    tsem_init(&pInfo->ready, 0, 0);
3191
    pInfo->epSet = pScanPhyNode->mgmtEpSet;
3192
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
3193 3194
  }

3195
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
3196
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
3197 3198 3199
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
3200
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
3201
  pOperator->pTaskInfo = pTaskInfo;
3202

L
Liu Jicong 已提交
3203 3204
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
H
Haojun Liao 已提交
3205 3206

  return pOperator;
3207

3208
_error:
3209 3210 3211 3212
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
3213
}
H
Haojun Liao 已提交
3214

3215
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
3216 3217 3218 3219
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3220 3221 3222
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
3223
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
3224
  SSDataBlock*  pRes = pInfo->pRes;
3225
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
3226

wmmhello's avatar
wmmhello 已提交
3227 3228
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
3229 3230 3231 3232
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

3233 3234 3235
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
3236
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
3237

wmmhello's avatar
wmmhello 已提交
3238 3239
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
L
Liu Jicong 已提交
3240
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
3241
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
3242
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3243 3244
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3245
      metaReaderClear(&mr);
3246
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
3247
    }
H
Haojun Liao 已提交
3248

3249
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
3250 3251 3252 3253 3254 3255
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
3256
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
3257 3258
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
3259
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
3260

3261 3262 3263 3264
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
3265 3266
          data = (char*)p;
        }
L
Liu Jicong 已提交
3267 3268
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
3269

3270 3271
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
3272
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
3273
        }
H
Haojun Liao 已提交
3274 3275 3276
      }
    }

3277
    count += 1;
wmmhello's avatar
wmmhello 已提交
3278
    if (++pInfo->curPos >= size) {
3279
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3280 3281 3282
    }
  }

3283 3284
  metaReaderClear(&mr);

3285
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
3286
  if (pOperator->status == OP_EXEC_DONE) {
3287
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
3288 3289 3290
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
3291
  pOperator->resultInfo.totalRows += count;
3292

3293
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
3294 3295
}

3296
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
3297 3298
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
S
shenglian zhou 已提交
3299
  taosArrayDestroy(pInfo->pColMatchInfo);
D
dapan1121 已提交
3300
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3301 3302
}

3303 3304
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3305
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
3306 3307 3308 3309 3310
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3311 3312
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

3313
  int32_t    num = 0;
3314 3315
  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
L
Liu Jicong 已提交
3316
  SArray*    colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
3317

3318 3319 3320 3321
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3322

3323 3324 3325 3326 3327
  pInfo->pTableList = pTableListInfo;
  pInfo->pColMatchInfo = colList;
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
3328

3329
  pOperator->name = "TagScanOperator";
3330
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
3331

3332 3333 3334 3335
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3336

3337
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3338 3339
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

3340 3341
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
3342 3343

  return pOperator;
3344

3345
_error:
H
Haojun Liao 已提交
3346 3347 3348 3349 3350
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
3351

H
Haojun Liao 已提交
3352
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
3353 3354
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                const char* idStr) {
H
Haojun Liao 已提交
3355 3356
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
3357
  if (pHandle == NULL) {
H
Haojun Liao 已提交
3358
    qError("invalid handle, in creating operator tree, %s", idStr);
H
Haojun Liao 已提交
3359 3360 3361
    return TSDB_CODE_INVALID_PARA;
  }

3362
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
3363
  if (code != TSDB_CODE_SUCCESS) {
3364
    qError("failed to getTableList, code: %s", tstrerror(code));
3365
    return code;
3366 3367
  }

H
Haojun Liao 已提交
3368
  int64_t st1 = taosGetTimestampUs();
L
Liu Jicong 已提交
3369
  qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
H
Haojun Liao 已提交
3370

3371
  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
3372
    qDebug("no table qualified for query, %s" PRIx64, idStr);
3373 3374
    return TSDB_CODE_SUCCESS;
  }
3375

H
Haojun Liao 已提交
3376 3377
  pTableListInfo->needSortTableByGroupId = groupSort;
  code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
3378
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3379
    return code;
3380 3381
  }

H
Haojun Liao 已提交
3382
  int64_t st2 = taosGetTimestampUs();
L
Liu Jicong 已提交
3383
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
H
Haojun Liao 已提交
3384

3385 3386 3387
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
3388
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
H
Haojun Liao 已提交
3389
                                  int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
S
slzhou 已提交
3390 3391 3392 3393
  for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
    SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));

H
Haojun Liao 已提交
3394 3395
    STsdbReader* pReader = NULL;
    tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
S
slzhou 已提交
3396 3397 3398 3399 3400 3401 3402 3403
    taosArrayPush(arrayReader, &pReader);

    taosArrayDestroy(subTableList);
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3404
// todo refactor
3405 3406
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
                                         int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
L
Liu Jicong 已提交
3407
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
S
shenglian zhou 已提交
3408
  STableMergeScanInfo* pInfo = pOperator->info;
3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434

  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;

  pCost->totalBlocks += 1;
  pCost->totalRows += pBlock->info.rows;

  *status = pInfo->dataBlockLoadFlag;
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
  taosMemoryFreeClear(pBlock->pBlockAgg);

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

    // clear all data in pBlock that are set when handing the previous block
3435
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
3436 3437 3438 3439 3440 3441 3442 3443 3444 3445
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

    bool             allColumnsHaveAgg = true;
    SColumnDataAgg** pColAgg = NULL;
H
Hongze Cheng 已提交
3446
    STsdbReader*     reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3447
    tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
3448 3449

    if (allColumnsHaveAgg == true) {
3450
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474

      // todo create this buffer during creating operator
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }

      return TSDB_CODE_SUCCESS;
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
    }
  }

  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);

  // todo filter data block according to the block sma data firstly
#if 0
H
Haojun Liao 已提交
3475
  if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif

  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;

H
Hongze Cheng 已提交
3487
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3488 3489 3490 3491 3492
  SArray*      pCols = tsdbRetrieveDataBlock(reader, NULL);
  if (pCols == NULL) {
    return terrno;
  }

3493
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
3494 3495

  // currently only the tbname pseudo column
S
shenglian zhou 已提交
3496 3497 3498
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
                                          pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3499
    if (code != TSDB_CODE_SUCCESS) {
3500
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
3501
    }
3502 3503
  }

3504 3505
  if (pTableScanInfo->pFilterNode != NULL) {
    int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
3506
    doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL);
3507

3508 3509
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
3510

3511 3512 3513 3514 3515 3516 3517
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
3518 3519 3520 3521 3522 3523 3524 3525
  }

  return TSDB_CODE_SUCCESS;
}

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
3526
  SSDataBlock*   inputBlock;
3527 3528 3529 3530 3531 3532
} STableMergeScanSortSourceParam;

static SSDataBlock* getTableDataBlock(void* param) {
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  int32_t                         readerIdx = source->readerIdx;
3533
  SSDataBlock*                    pBlock = source->inputBlock;
3534 3535 3536 3537
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  int64_t st = taosGetTimestampUs();

3538 3539
  blockDataCleanup(pBlock);

H
Hongze Cheng 已提交
3540
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3541 3542
  while (tsdbNextDataBlock(reader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
3543
      T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
3544 3545 3546 3547 3548 3549 3550 3551
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

3552 3553 3554 3555
    blockDataCleanup(pBlock);
    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(reader, &binfo);

3556
    blockDataEnsureCapacity(pBlock, binfo.rows);
3557 3558 3559 3560
    pBlock->info.type = binfo.type;
    pBlock->info.uid = binfo.uid;
    pBlock->info.window = binfo.window;
    pBlock->info.rows = binfo.rows;
3561 3562 3563 3564 3565

    uint32_t status = 0;
    int32_t  code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
3566
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

    uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    return pBlock;
  }
  return NULL;
}

3587 3588 3589 3590 3591 3592 3593 3594 3595
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
    SColMatchInfo* colInfo = taosArrayGet(colMatchInfo, i);
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
      tsTargetSlotId = colInfo->targetSlotId;
    }
  }

3596 3597 3598
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
3599
  bi.slotId = tsTargetSlotId;
3600 3601 3602 3603 3604 3605 3606
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

3607
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
3608 3609 3610
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621
  {
    size_t  tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
    int32_t i = pInfo->tableStartIndex + 1;
    for (; i < tableListSize; ++i) {
      STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
3622

S
slzhou 已提交
3623 3624
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
3625

S
slzhou 已提交
3626
  STableListInfo* tableListInfo = pInfo->tableListInfo;
3627
  pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
S
slzhou 已提交
3628
  createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
3629
                            pInfo->dataReaders, GET_TASKID(pTaskInfo));
3630

3631 3632
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
3633
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
3634
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
3635 3636
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
3637 3638 3639 3640

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);

  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
3641 3642 3643 3644 3645 3646 3647 3648
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
  }

3649 3650
  for (int32_t i = 0; i < numReaders; ++i) {
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
3651
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
3652 3653 3654 3655 3656 3657 3658
    ps->param = param;
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
3659
    T_LONG_JMP(pTaskInfo->env, terrno);
3660 3661
  }

3662 3663 3664 3665 3666 3667 3668
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

3669 3670
  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);

3671 3672 3673 3674 3675 3676 3677
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

3678 3679 3680 3681
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
3682 3683
  taosArrayClear(pInfo->sortSourceParams);

3684 3685 3686
  tsortDestroySortHandle(pInfo->pSortHandle);

  for (int32_t i = 0; i < numReaders; ++i) {
H
Haojun Liao 已提交
3687 3688
    STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
    tsdbReaderClose(reader);
3689
  }
3690 3691
  taosArrayDestroy(pInfo->dataReaders);
  pInfo->dataReaders = NULL;
3692 3693 3694
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
3695 3696
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
3697 3698 3699
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

3700 3701
  blockDataCleanup(pResBlock);
  blockDataEnsureCapacity(pResBlock, capacity);
3702 3703

  while (1) {
3704
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
3705 3706 3707 3708
    if (pTupleHandle == NULL) {
      break;
    }

3709 3710
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
3711 3712 3713 3714
      break;
    }
  }

3715 3716
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
3729
    T_LONG_JMP(pTaskInfo->env, code);
3730
  }
S
slzhou 已提交
3731 3732 3733
  size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
3734

S
slzhou 已提交
3735
    if (tableListSize == 0) {
3736 3737 3738
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
S
slzhou 已提交
3739 3740
    pInfo->tableStartIndex = 0;
    pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
3741 3742
    startGroupTableMergeScan(pOperator);
  }
S
slzhou 已提交
3743 3744
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
3745 3746
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
        doSetOperatorCompleted(pOperator);
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
      pInfo->groupId =
          ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
3762 3763
  }

3764 3765 3766
  return pBlock;
}

3767
void destroyTableMergeScanOperatorInfo(void* param) {
3768
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
3769
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
3770
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
3771 3772

  for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
H
Hongze Cheng 已提交
3773
    STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
H
refact  
Hongze Cheng 已提交
3774
    tsdbReaderClose(reader);
3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785
  }
  taosArrayDestroy(pTableScanInfo->dataReaders);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
3786
  cleanupExprSupp(&pTableScanInfo->pseudoSup);
L
Liu Jicong 已提交
3787

3788
  taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset);
D
dapan1121 已提交
3789
  taosMemoryFreeClear(param);
3790 3791
}

3792 3793
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
L
Liu Jicong 已提交
3794
  SSortExecInfo          sortExecInfo;
3795 3796
} STableMergeScanExecInfo;

3797 3798
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3799 3800
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3801
  STableMergeScanInfo*     pInfo = pOptr->info;
3802
  execInfo->blockRecorder = pInfo->readRecorder;
3803
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3804 3805 3806

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3807

3808 3809 3810
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
3811 3812 3813
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
  const STableKeyInfo* info1 = p1;
  const STableKeyInfo* info2 = p2;
3814 3815 3816 3817 3818 3819 3820
  if (info1->groupId < info2->groupId) {
    return -1;
  } else if (info1->groupId > info2->groupId) {
    return 1;
  } else {
    return 0;
  }
S
slzhou 已提交
3821 3822
}

3823
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
3824
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
3825 3826 3827 3828 3829
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3830
  if (pTableScanNode->pGroupTags) {
S
slzhou 已提交
3831 3832
    taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
  }
3833 3834 3835 3836

  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
L
Liu Jicong 已提交
3837
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
3838 3839 3840

  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3841
    taosArrayDestroy(pColList);
3842 3843 3844 3845
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
3846 3847 3848
    SExprSupp* pSup = &pInfo->pseudoSup;
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3849 3850 3851 3852
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

L
Liu Jicong 已提交
3853 3854
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
3855
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3856 3857 3858
  pInfo->sample.seed = taosGetTimestampSec();
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
3859
  pInfo->tableListInfo = pTableListInfo;
L
Liu Jicong 已提交
3860 3861
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
3862 3863

  pInfo->pResBlock = createResDataBlock(pDescNode);
3864
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3865

3866
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order);
3867
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3868

3869
  int32_t rowSize = pInfo->pResBlock->info.rowSize;
L
Liu Jicong 已提交
3870
  pInfo->bufPageSize = getProperSortPageSize(rowSize);
3871

L
Liu Jicong 已提交
3872
  pOperator->name = "TableMergeScanOperator";
3873
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
L
Liu Jicong 已提交
3874 3875 3876
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
3877
  pOperator->exprSupp.numOfExprs = numOfCols;
L
Liu Jicong 已提交
3878
  pOperator->pTaskInfo = pTaskInfo;
3879
  initResultSizeInfo(&pOperator->resultInfo, 1024);
3880 3881

  pOperator->fpSet =
3882 3883
      createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
                          NULL, getTableMergeScanExplainExecInfo);
3884 3885 3886 3887 3888 3889 3890 3891 3892
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}