scanoperator.c 139.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33
#include "vnode.h"
H
Haojun Liao 已提交
34 35

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
36
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
37

38
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
39 40
static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta,
                                     size_t size, const char* dbName);
41

L
Liu Jicong 已提交
42
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
43

44 45 46
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
                                                SMetaReader* smrChildTable, const char* dbname, const char* tableName,
                                                int32_t* pNumOfRows, const SSDataBlock* dataBlock);
47 48 49

static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
bool        processBlockWithProbability(const SSampleExecInfo* pInfo) {
50 51 52 53 54 55 56 57 58 59 60 61
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

62
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

91 92 93 94 95 96 97 98 99
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
100
  // convert key to second
101 102 103 104 105 106 107
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
108
  time_t    t = (time_t)key;
109 110 111 112 113
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
114
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
115 116 117 118

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
119
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
120 121 122 123

  tw->ekey -= 1;
}

124
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
125 126 127 128 129 130 131
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

132
  if (order == TSDB_ORDER_ASC) {
133
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
134 135
    assert(w.ekey >= pBlockInfo->window.skey);

S
slzhou 已提交
136
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
137 138 139
      return true;
    }

140 141
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
142 143 144 145 146
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
147
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
148 149 150 151
        return true;
      }
    }
  } else {
152
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
153 154
    assert(w.skey <= pBlockInfo->window.ekey);

155
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
156 157 158
      return true;
    }

159
    while (1) {
160 161 162 163 164 165
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
166
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
167 168 169
        return true;
      }
    }
170 171 172 173 174
  }

  return false;
}

175 176 177 178 179 180 181 182 183 184 185
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

186 187
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

  if (p1 == NULL) {
    return NULL;
  }

  *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId);
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

  if (pTableScanInfo->pdInfo.pExprSup == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup;

  SFilePage*  pPage = NULL;
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage);

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset);

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
  releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage);

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
236 237 238 239 240 241 242 243 244 245
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
                                            int32_t numOfRows) {
  if (pColsAgg == NULL || pFilterNode == NULL) {
    return true;
  }

  SFilterInfo* filter = NULL;

  // todo move to the initialization function
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
L
Liu Jicong 已提交
246
  bool    keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
247 248 249 250 251 252 253 254 255 256 257

  filterFreeInfo(filter);
  return keep;
}

static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
  bool             allColumnsHaveAgg = true;
  SColumnDataAgg** pColAgg = NULL;

  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
  if (code != TSDB_CODE_SUCCESS) {
258
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  }

  if (!allColumnsHaveAgg) {
    return false;
  }

  //  if (allColumnsHaveAgg == true) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

  // todo create this buffer during creating operator
  if (pBlock->pBlockAgg == NULL) {
    pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
    if (pBlock->pBlockAgg == NULL) {
272
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
H
Haojun Liao 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286
    }
  }

  for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
    SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
    if (!pColMatchInfo->output) {
      continue;
    }
    pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
  }

  return true;
}

L
Liu Jicong 已提交
287 288
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
289
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
290 291
  STableScanInfo* pInfo = pOperator->info;

292
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
293 294

  pCost->totalBlocks += 1;
295
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
296
  bool loadSMA = false;
H
Haojun Liao 已提交
297

298
  *status = pInfo->dataBlockLoadFlag;
299
  if (pTableScanInfo->pFilterNode != NULL ||
300
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
301 302 303 304
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
305
  taosMemoryFreeClear(pBlock->pBlockAgg);
306 307

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
308 309
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
310 311 312
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
313 314
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
315
    pCost->skipBlocks += 1;
316

317 318 319
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
320
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
321
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
322
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
323 324 325 326
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
      return TSDB_CODE_SUCCESS;
    } else {
327
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
328
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
329
    }
H
Haojun Liao 已提交
330
  }
331

H
Haojun Liao 已提交
332
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
333

H
Haojun Liao 已提交
334
  // try to filter data block according to sma info
335 336 337 338 339 340 341 342 343 344 345 346 347
  if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) {
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
      bool   keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows);
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
348
    }
H
Haojun Liao 已提交
349
  }
350

351 352 353
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

354
  // try to filter data block according to current results
355 356
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
357
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
358 359 360
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

361
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
362 363 364
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
365 366
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
367

H
Haojun Liao 已提交
368 369 370
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
371 372
  }

373
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
374 375

  // currently only the tbname pseudo column
376
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
377
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;
H
Haojun Liao 已提交
378

L
Liu Jicong 已提交
379 380
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
                                          GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
381
    if (code != TSDB_CODE_SUCCESS) {
382
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
383
    }
384 385
  }

386 387
  if (pTableScanInfo->pFilterNode != NULL) {
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
388
    doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo);
389

390 391
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
392

393 394 395 396 397 398 399
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
400 401
  }

H
Haojun Liao 已提交
402 403 404
  return TSDB_CODE_SUCCESS;
}

405
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
406 407 408
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
409
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
410

411
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
412 413
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
414 415
}

H
Haojun Liao 已提交
416
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
L
Liu Jicong 已提交
417
                               SSDataBlock* pBlock, const char* idStr) {
418
  // currently only the tbname pseudo column
419
  if (numOfPseudoExpr == 0) {
H
Haojun Liao 已提交
420
    return TSDB_CODE_SUCCESS;
421 422 423
  }

  SMetaReader mr = {0};
424
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
425 426
  int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
427
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
428 429 430
    metaReaderClear(&mr);
    return terrno;
  }
431

432 433
  metaReaderReleaseLock(&mr);

434 435
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
436 437 438 439

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
440
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
441 442 443 444 445

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
446
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
447
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
448 449
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
450
      const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
451

452 453 454 455
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
456
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
457
      }
458

H
Haojun Liao 已提交
459 460 461
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
462
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
463
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
L
Liu Jicong 已提交
464
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
465
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
466
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
467
        }
468
      }
469

470 471
      if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
          IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
wmmhello's avatar
wmmhello 已提交
472
        taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
473
      }
474 475 476 477
    }
  }

  metaReaderClear(&mr);
H
Haojun Liao 已提交
478
  return TSDB_CODE_SUCCESS;
479 480
}

481 482 483 484
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

485 486
  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
  colInfoDataEnsureCapacity(&infoData, 1);
487

488
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
489
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
490 491 492

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
D
dapan1121 已提交
493
  colDataDestroy(&infoData);
494 495
}

496
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
497
  STableScanInfo* pTableScanInfo = pOperator->info;
498
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
499
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
500

501 502
  int64_t st = taosGetTimestampUs();

503
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
504
    if (isTaskKilled(pTaskInfo)) {
505
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
506
    }
H
Haojun Liao 已提交
507

508 509 510 511 512 513
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

514 515 516 517 518 519 520 521
    blockDataCleanup(pBlock);

    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.rows);
    pBlock->info = binfo;
L
Liu Jicong 已提交
522
    ASSERT(binfo.uid != 0);
H
Haojun Liao 已提交
523

524 525 526 527 528
    uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

529 530 531 532
    uint32_t status = 0;
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
533
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
534
    }
535

536 537 538
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
539
    }
540 541 542 543 544

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
545 546

    // todo refactor
L
Liu Jicong 已提交
547 548 549 550 551
    /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
552

L
Liu Jicong 已提交
553
    ASSERT(pBlock->info.uid != 0);
554
    return pBlock;
H
Haojun Liao 已提交
555 556 557 558
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
559
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
560 561 562 563
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
564
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
565 566 567
    return NULL;
  }

568 569
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
570 571 572 573
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      ASSERT(p->info.uid != 0);
      return p;
H
Haojun Liao 已提交
574 575
    }

576
    pTableScanInfo->scanTimes += 1;
577

578
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
579 580
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
L
Liu Jicong 已提交
581 582 583 584
      qDebug(
          "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
          "due to query func required",
          GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
585

586
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
587
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
H
Haojun Liao 已提交
588
    }
589
  }
H
Haojun Liao 已提交
590

591
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
592
  if (pTableScanInfo->scanTimes < total) {
593
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
594 595
      prepareForDescendingScan(pTableScanInfo, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
596
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
597
    }
H
Haojun Liao 已提交
598

599
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
600 601 602
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
603
      }
H
Haojun Liao 已提交
604

605
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
606

607
      if (pTableScanInfo->scanTimes < total) {
608 609
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
610

611 612
        qDebug("%s start to repeat descending order scan data blocks due to query func required",
               GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
613
        tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
614
      }
H
Haojun Liao 已提交
615 616 617
    }
  }

wmmhello's avatar
wmmhello 已提交
618 619 620 621 622 623 624
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

L
Liu Jicong 已提交
625 626
  // if scan table by table
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
627 628 629 630 631 632
    if (pInfo->noTable) {
      return NULL;
    }

    int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);

L
Liu Jicong 已提交
633
    while (1) {
L
Liu Jicong 已提交
634 635 636 637
      SSDataBlock* result = doTableScanGroup(pOperator);
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
638

L
Liu Jicong 已提交
639 640
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
641
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
642 643
        return NULL;
      }
H
Haojun Liao 已提交
644

L
Liu Jicong 已提交
645 646
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
      tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
H
Haojun Liao 已提交
647 648 649
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);

H
Haojun Liao 已提交
650
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
L
Liu Jicong 已提交
651 652 653 654
      pInfo->scanTimes = 0;
    }
  }

655
  if (pInfo->currentGroupId == -1) {
wmmhello's avatar
wmmhello 已提交
656
    pInfo->currentGroupId++;
wmmhello's avatar
wmmhello 已提交
657
    if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
658
      setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
659 660
      return NULL;
    }
H
Haojun Liao 已提交
661

662
    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
H
Haojun Liao 已提交
663 664 665

    tsdbReaderClose(pInfo->dataReader);

666 667
    int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
                                  GET_TASKID(pTaskInfo));
668
    if (code != TSDB_CODE_SUCCESS) {
669
      T_LONG_JMP(pTaskInfo->env, code);
670 671
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
672 673 674
  }

  SSDataBlock* result = doTableScanGroup(pOperator);
675
  if (result) {
wmmhello's avatar
wmmhello 已提交
676 677 678 679 680
    return result;
  }

  pInfo->currentGroupId++;
  if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
681
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
682 683 684
    return NULL;
  }

685
  SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
686
  //  tsdbSetTableList(pInfo->dataReader, tableList);
wmmhello's avatar
wmmhello 已提交
687

H
Haojun Liao 已提交
688
  tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
wmmhello's avatar
wmmhello 已提交
689 690 691
  pInfo->scanTimes = 0;

  result = doTableScanGroup(pOperator);
692
  if (result) {
wmmhello's avatar
wmmhello 已提交
693 694 695
    return result;
  }

696 697
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
698 699
}

700 701
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
702
  STableScanInfo*         pTableScanInfo = pOptr->info;
703 704 705 706 707 708
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

709
static void destroyTableScanOperatorInfo(void* param) {
710
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
711
  blockDataDestroy(pTableScanInfo->pResBlock);
712
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
H
Haojun Liao 已提交
713

H
refact  
Hongze Cheng 已提交
714
  tsdbReaderClose(pTableScanInfo->dataReader);
715
  pTableScanInfo->dataReader = NULL;
716 717 718 719

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
L
Liu Jicong 已提交
720

721
  cleanupExprSupp(&pTableScanInfo->pseudoSup);
D
dapan1121 已提交
722
  taosMemoryFreeClear(param);
723 724
}

wmmhello's avatar
wmmhello 已提交
725
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
726
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
727 728 729
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
730
    goto _error;
H
Haojun Liao 已提交
731 732
  }

733
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
734
  int32_t             numOfCols = 0;
735
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
736

737 738
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
739
    goto _error;
740 741 742
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
743
    SExprSupp* pSup = &pInfo->pseudoSup;
744 745
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
746 747
  }

748
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
749
  pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode);
750 751 752
  pInfo->readHandle = *readHandle;
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
753

754
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
755 756
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
H
Haojun Liao 已提交
757 758 759 760 761

  if (pInfo->pFilterNode != NULL) {
    code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
  }

762 763
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
wmmhello's avatar
wmmhello 已提交
764
  pInfo->currentGroupId = -1;
765
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
766 767

  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
768
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
769 770 771
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
772
  pOperator->exprSupp.numOfExprs = numOfCols;
773
  pOperator->pTaskInfo = pTaskInfo;
774

775 776
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
777 778 779

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
780
  return pOperator;
781

782
_error:
783 784 785 786 787
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);

  pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
788 789
}

790
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
791
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
792
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
793

L
Liu Jicong 已提交
794 795
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
796

797
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
798
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
799 800 801 802
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
803

804
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
805 806 807
  return pOperator;
}

H
Haojun Liao 已提交
808 809
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, const char* idstr) {
  *rowLen = 0;
H
Haojun Liao 已提交
810

811
  SMetaReader mr = {0};
812
  metaReaderInit(&mr, pMeta, 0);
H
Haojun Liao 已提交
813 814
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
815
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
H
Haojun Liao 已提交
816 817 818 819
    metaReaderClear(&mr);
    return terrno;
  }

820 821
  if (mr.me.type == TSDB_SUPER_TABLE) {
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
822
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
823
      (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
824 825 826
    }
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    uint64_t suid = mr.me.ctbEntry.suid;
827
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
828 829
    code = metaGetTableEntryByUid(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
830
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
H
Haojun Liao 已提交
831 832 833 834
      metaReaderClear(&mr);
      return terrno;
    }

835 836
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;

837
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
838
      (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
839 840 841
    }
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
842
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
843
      (*rowLen) += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
844 845 846 847
    }
  }

  metaReaderClear(&mr);
H
Haojun Liao 已提交
848
  return TSDB_CODE_SUCCESS;
849 850 851 852 853 854 855 856
}

static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SBlockDistInfo* pBlockScanInfo = pOperator->info;
L
Liu Jicong 已提交
857
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
858 859

  STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
L
Liu Jicong 已提交
860 861
  int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize,
                                   GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
862
  if (code != TSDB_CODE_SUCCESS) {
863
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
864
  }
865 866 867

  tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
  blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
H
Haojun Liao 已提交
868

869
  SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
H
Haojun Liao 已提交
870

871
  int32_t          slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
872
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
873

874
  int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
875
  char*   p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
876 877 878
  tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
  varDataSetLen(p, len);

879
  blockDataEnsureCapacity(pBlock, 1);
880 881
  colDataAppend(pColInfo, 0, p, false);
  taosMemoryFree(p);
H
Haojun Liao 已提交
882

883 884
  pBlock->info.rows = 1;

H
Haojun Liao 已提交
885 886 887 888
  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

889
static void destroyBlockDistScanOperatorInfo(void* param) {
890
  SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
891
  blockDataDestroy(pDistInfo->pResBlock);
H
Hongze Cheng 已提交
892
  tsdbReaderClose(pDistInfo->pHandle);
D
dapan1121 已提交
893
  taosMemoryFreeClear(param);
894 895
}

896 897
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
898
  SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
899
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
900 901 902 903 904
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

905
  pInfo->pHandle = dataReader;
906
  pInfo->readHandle = *readHandle;
907 908
  pInfo->uid = uid;
  pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
909

910
  int32_t    numOfCols = 0;
911
  SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
912
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
913 914 915
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
916

917
  pOperator->name = "DataBlockDistScanOperator";
918
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
919 920 921
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
922 923 924 925
  pOperator->pTaskInfo = pTaskInfo;

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
                                         destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
926 927
  return pOperator;

928
_error:
H
Haojun Liao 已提交
929 930 931 932 933
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

934
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
935 936
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
937 938
}

939
static bool isSessionWindow(SStreamScanInfo* pInfo) {
940 941
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
942 943
}

944
static bool isStateWindow(SStreamScanInfo* pInfo) {
945
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
946
}
5
54liuyao 已提交
947

L
Liu Jicong 已提交
948
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
949 950 951
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
952 953 954
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
955
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
956 957
}

958 959 960 961
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

962
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
963 964
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
965
  ASSERT(rowIndex < pBlock->info.rows);
966
  pInfo->groupId = groupCol[rowIndex];
967 968
}

L
Liu Jicong 已提交
969
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
970
  pTableScanInfo->cond.twindows = *pWin;
L
Liu Jicong 已提交
971 972 973 974
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
}

L
Liu Jicong 已提交
975
static void freeArray(void* array) { taosArrayDestroy(array); }
976 977 978 979 980 981 982 983 984 985 986 987 988

static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
  STableScanInfo* pTableScanInfo = pTableScanOp->info;
  pTableScanInfo->cond.startVersion = -1;
  pTableScanInfo->cond.endVersion = -1;
  SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
  SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList;
  taosArrayClearP(gpTbls, freeArray);
  taosArrayPush(gpTbls, &allTbls);
  STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
  resetTableScanInfo(pTableScanOp->info, &win);
}

L
Liu Jicong 已提交
989 990
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
991 992 993
  SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList;
  taosArrayClear(gpTbls);
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
L
Liu Jicong 已提交
994
  SArray*       tbls = taosArrayInit(1, sizeof(STableKeyInfo));
995 996 997
  taosArrayPush(tbls, &tblInfo);
  taosArrayPush(gpTbls, &tbls);

L
Liu Jicong 已提交
998
  STimeWindow     win = {.skey = startTs, .ekey = endTs};
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
  STableScanInfo* pTableScanInfo = pTableScanOp->info;
  pTableScanInfo->cond.startVersion = -1;
  pTableScanInfo->cond.endVersion = maxVersion;
  resetTableScanInfo(pTableScanOp->info, &win);
  SSDataBlock* pRes = doTableScan(pTableScanOp);
  resetTableScanOperator(pTableScanOp);
  return pRes;
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1017
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1018 1019 1020 1021 1022 1023 1024 1025
  SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
  uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
  if (groupId) {
    return *groupId;
  }
  return 0;
}

5
54liuyao 已提交
1026 1027 1028 1029 1030 1031 1032 1033
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1045 1046 1047
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1048 1049 1050 1051 1052 1053

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1054
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1055 1056 1057 1058
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1059 1060 1061
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1062
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1063 1064 1065
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1066
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1067 1068 1069
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1070 1071
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1072 1073 1074 1075
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1076
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1077 1078 1079
  return true;
}

5
54liuyao 已提交
1080
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1081
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
1082
  SResultRowInfo dumyInfo;
5
54liuyao 已提交
1083
  dumyInfo.cur.pageId = -1;
1084
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1085 1086
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1087
  uint64_t    groupId = gpIdCol[*pRowIndex];
5
54liuyao 已提交
1088
  while (1) {
1089 1090 1091
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1092 1093 1094 1095 1096 1097
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) {
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1098
    }
5
54liuyao 已提交
1099

5
54liuyao 已提交
1100 1101 1102
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1103
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1104
    endWin = preWin;
5
54liuyao 已提交
1105
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1106 1107 1108 1109 1110 1111
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1112

L
Liu Jicong 已提交
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1124
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1125 1126 1127
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
      tsdbReaderClose(pTableScanInfo->dataReader);
      pTableScanInfo->dataReader = NULL;
1128 1129
      return NULL;
    }
L
Liu Jicong 已提交
1130

H
Haojun Liao 已提交
1131
    doFilter(pInfo->pCondition, pResult, NULL, NULL);
1132 1133 1134 1135
    if (pResult->info.rows == 0) {
      continue;
    }

1136 1137 1138 1139 1140 1141 1142 1143
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1144 1145
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
    } else if (pResult->info.groupId == pInfo->groupId) {
5
54liuyao 已提交
1156
      pResult->info.calWin = pInfo->updateWin;
1157
      return pResult;
5
54liuyao 已提交
1158 1159
    }
  }
1160
}
1161

1162 1163 1164
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1165
  }
1166 1167
  blockDataCleanup(pDestBlock);
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1168
  if (code != TSDB_CODE_SUCCESS) {
1169
    return code;
L
Liu Jicong 已提交
1170
  }
1171 1172
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1173
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1174
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1175
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1176 1177
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1178

1179 1180
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1181
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1182
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1183 1184
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1185
  int32_t          dummy = 0;
L
Liu Jicong 已提交
1186
  int64_t          version = pSrcBlock->info.version - 1;
1187
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1188
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1189 1190
    // gap must be 0.
    SResultWindowInfo* pStartWin =
1191
        getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
L
Liu Jicong 已提交
1192 1193 1194 1195 1196
    if (!pStartWin) {
      // window has been closed.
      continue;
    }
    SResultWindowInfo* pEndWin =
1197
        getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
L
Liu Jicong 已提交
1198
    ASSERT(pEndWin);
5
54liuyao 已提交
1199
    TSKEY ts = INT64_MIN;
L
Liu Jicong 已提交
1200 1201
    colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
5
54liuyao 已提交
1202
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1203
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1204 1205
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1206
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1207
  }
1208
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1209
}
1210 1211 1212 1213 1214 1215

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1216
  }
1217
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows * 2);
1218 1219 1220 1221
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1222 1223
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1224 1225 1226 1227
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
1228 1229 1230
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
1231 1232
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1233
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1234 1235 1236
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1237
  int64_t          version = pSrcBlock->info.version - 1;
1238
  for (int32_t i = 0; i < rows;) {
1239
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1240 1241 1242 1243 1244
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1245
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1246
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1247 1248
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1249 1250
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1251 1252 1253 1254
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1255
  }
1256 1257
  return TSDB_CODE_SUCCESS;
}
1258

1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
  }
  blockDataCleanup(pDestBlock);
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;

  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  int32_t          dummy = 0;
  int64_t          version = pSrcBlock->info.version - 1;
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
    colDataAppend(pDestStartCol, i, (const char*)(startData + i), false);
    colDataAppend(pDestEndCol, i, (const char*)(endData + i), false);
    colDataAppendNULL(pDestUidCol, i);
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
    pDestBlock->info.rows++;
  }
  return TSDB_CODE_SUCCESS;
}

1297 1298 1299 1300
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1301
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1302
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
1303
  }
1304
  pDestBlock->info.type = STREAM_CLEAR;
1305
  pDestBlock->info.version = pSrcBlock->info.version;
1306 1307 1308 1309
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);

  // build tagArray
  // build STag
  // set STag

  blockDataDestroy(pSrcBlock);
}

static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
L
Liu Jicong 已提交
1351 1352
    memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN));
    pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0;
L
Liu Jicong 已提交
1353 1354 1355 1356 1357 1358 1359 1360
  } else {
    pBlock->info.parTbName[0] = 0;
  }

  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1361 1362
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1363 1364
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1365 1366
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1367 1368
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1369
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1370 1371
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1372 1373
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1374 1375
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1376
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1377
  pBlock->info.rows++;
5
54liuyao 已提交
1378 1379
}

1380
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1381 1382
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1383
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1384
  }
1385 1386
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1387
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
L
Liu Jicong 已提交
1388
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
1389
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1390 1391
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1392
    bool        isClosed = false;
5
54liuyao 已提交
1393
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1394
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1395 1396 1397
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1398 1399
    // must check update info first.
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1400
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1401 1402
                     isDeletedStreamWindow(&win, pBlock->info.groupId,
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1403
    if ((update || closedWin) && out) {
H
Haojun Liao 已提交
1404
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1405
      uint64_t gpId = 0;
1406 1407
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                       NULL);
5
54liuyao 已提交
1408 1409
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
1410 1411
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                         NULL);
5
54liuyao 已提交
1412
      }
1413 1414
    }
  }
1415 1416
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1417
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1418
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1419 1420
  }
}
L
Liu Jicong 已提交
1421

1422
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1423 1424
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1425
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1426

1427 1428
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1429 1430 1431
  pInfo->pRes->info.rows = pBlock->info.rows;
  pInfo->pRes->info.uid = pBlock->info.uid;
  pInfo->pRes->info.type = STREAM_NORMAL;
1432
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1433

L
Liu Jicong 已提交
1434
  uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
L
Liu Jicong 已提交
1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
  if (groupIdPre) {
    pInfo->pRes->info.groupId = *groupIdPre;
  } else {
    pInfo->pRes->info.groupId = 0;
  }

  // todo extract method
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
    SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
    if (!pColMatchInfo->output) {
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
1452 1453
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1468 1469
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
                                          GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1470
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1471
      blockDataFreeRes((SSDataBlock*)pBlock);
1472
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1473
    }
L
Liu Jicong 已提交
1474 1475
  }

H
Haojun Liao 已提交
1476
  doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
L
Liu Jicong 已提交
1477
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1478
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1479

L
Liu Jicong 已提交
1480
  calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
L
Liu Jicong 已提交
1481

L
Liu Jicong 已提交
1482 1483
  return 0;
}
5
54liuyao 已提交
1484

L
Liu Jicong 已提交
1485
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1486 1487
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1488

L
Liu Jicong 已提交
1489
  qDebug("queue scan called");
L
Liu Jicong 已提交
1490 1491 1492 1493 1494 1495 1496 1497

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1498
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1515
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1516 1517 1518 1519 1520 1521 1522 1523

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1524
    return NULL;
L
Liu Jicong 已提交
1525 1526
  }

L
Liu Jicong 已提交
1527 1528 1529
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1530
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1531
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1532 1533
      return pResult;
    } else {
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
        tsdbReaderClose(pTSInfo->dataReader);
        pTSInfo->dataReader = NULL;
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
        qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1545 1546
        return NULL;
      }
1547 1548 1549
    }
  }

L
Liu Jicong 已提交
1550 1551 1552 1553 1554 1555
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1556
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1557 1558 1559
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1560
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1561
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1562 1563 1564 1565
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1566 1567 1568
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1569 1570
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1571
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1572 1573 1574 1575
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1576
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1577
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1578 1579 1580
        return NULL;
      }
    }
L
Liu Jicong 已提交
1581
#if 0
L
Liu Jicong 已提交
1582 1583
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1584 1585 1586 1587 1588 1589
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1590
#endif
L
Liu Jicong 已提交
1591 1592 1593
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1594
  }
L
Liu Jicong 已提交
1595 1596
}

L
Liu Jicong 已提交
1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
  pDst->info = pSrc->info;
  pDst->info.rows = j;

  return 0;
}

5
54liuyao 已提交
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  } else {
    // SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
    // if (!pPreRes || pPreRes->info.rows == 0) {
    //   return 0;
    // }
    // ASSERT(pPreRes->info.rows == 1);
    // return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
  }
}

L
Liu Jicong 已提交
1655 1656 1657 1658 1659
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1660
  qDebug("stream scan called");
L
Liu Jicong 已提交
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1694
  }
L
Liu Jicong 已提交
1695
#endif
H
Haojun Liao 已提交
1696

L
Liu Jicong 已提交
1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
      return pBlock;
    }
    // TODO fill in bloom filter
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
    return NULL;
  }

5
54liuyao 已提交
1715
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
1716
  // TODO: refactor
L
Liu Jicong 已提交
1717
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1718
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1719
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1720
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1721
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1722 1723 1724
      return NULL;
    }

1725
    int32_t      current = pInfo->validBlockIndex++;
1726
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
1727
    // TODO move into scan
5
54liuyao 已提交
1728 1729
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1730
    blockDataUpdateTsWindow(pBlock, 0);
1731
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1732 1733 1734
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1735 1736 1737
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1738 1739
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1740 1741 1742
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1743
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1744
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1745
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1746
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1747
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1748 1749
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1750
        }
5
54liuyao 已提交
1751 1752
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
1753
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1754
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1755
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1756
          printDataBlock(pDelBlock, "stream scan delete result");
L
Liu Jicong 已提交
1757 1758 1759 1760 1761
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1762 1763 1764
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1765
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1766 1767 1768 1769
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
          pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1770 1771 1772 1773
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1774 1775 1776 1777 1778
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1779
        }
1780 1781 1782
      } break;
      default:
        break;
5
54liuyao 已提交
1783
    }
1784
    // printDataBlock(pBlock, "stream scan recv");
1785
    return pBlock;
L
Liu Jicong 已提交
1786
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1787
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1788 1789 1790 1791 1792 1793
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1794
      case STREAM_SCAN_FROM_DELETE_DATA: {
1795 1796 1797 1798 1799 1800 1801
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1812
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
1813 1814
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1815 1816
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1817
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1818
          calBlockTbName(&pInfo->tbnameCalSup, pSDB);
5
54liuyao 已提交
1819 1820
          return pSDB;
        }
1821
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1822 1823 1824 1825
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1826
    }
1827

1828
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1829
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1830 1831
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1832 1833
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1834 1835
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1836
    }
5
54liuyao 已提交
1837

H
Haojun Liao 已提交
1838 1839
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1840
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1841

L
Liu Jicong 已提交
1842
  NEXT_SUBMIT_BLK:
1843 1844 1845
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1846
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1847
          doClearBufferedBlocks(pInfo);
1848 1849
          return NULL;
        }
1850

1851 1852 1853 1854 1855 1856 1857 1858
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1859 1860
      }

1861 1862 1863 1864
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1865

1866 1867 1868 1869 1870 1871
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1872
        setBlockIntoRes(pInfo, &block, false);
1873

L
Liu Jicong 已提交
1874 1875
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
                             pInfo->pRes->info.version)) {
1876 1877 1878 1879 1880
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1897
        doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
1898 1899 1900
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1901 1902 1903
          break;
        }
      }
1904
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1905
        break;
J
jiacy-jcy 已提交
1906 1907
      } else {
        pInfo->tqReader->pMsg = NULL;
1908
        continue;
5
54liuyao 已提交
1909
      }
1910
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1911 1912 1913 1914
    }

    // record the scan action.
    pInfo->numOfExec++;
1915
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
1916
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
1917

L
Liu Jicong 已提交
1918
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
1919 1920 1921
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
1922 1923 1924 1925 1926 1927

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
1928 1929 1930
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1931 1932 1933
  }
}

1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
  for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

1946
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1947 1948
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
1949
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1950
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
1951
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
1952

wmmhello's avatar
wmmhello 已提交
1953
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
1954
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
wmmhello's avatar
wmmhello 已提交
1955
    SSDataBlock* pBlock = &pInfo->pRes;
1956

1957
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
1958 1959 1960
      if (isTaskKilled(pTaskInfo)) {
        longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
      }
1961

wmmhello's avatar
wmmhello 已提交
1962
      tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info);
1963

wmmhello's avatar
wmmhello 已提交
1964 1965 1966
      SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      pBlock->pDataBlock = pCols;
      if (pCols == NULL) {
wmmhello's avatar
wmmhello 已提交
1967
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
1968 1969
      }

wmmhello's avatar
wmmhello 已提交
1970
      qDebug("tmqsnap doRawScan get data uid:%ld", pBlock->info.uid);
wmmhello's avatar
wmmhello 已提交
1971 1972 1973 1974 1975
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
1976 1977

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
1978
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
1979 1980
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
1981 1982
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
1983
    } else {
wmmhello's avatar
wmmhello 已提交
1984 1985 1986 1987 1988
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
      qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
1989
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
1990
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
1991
    return NULL;
L
Liu Jicong 已提交
1992 1993 1994 1995 1996 1997 1998
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
1999
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2000
      taosMemoryFreeClear(data);
2001 2002 2003
      return NULL;
    }

L
Liu Jicong 已提交
2004
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2005 2006 2007 2008
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2009
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2010
    } else {
wmmhello's avatar
wmmhello 已提交
2011 2012 2013 2014 2015 2016 2017
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2018

wmmhello's avatar
wmmhello 已提交
2019
    return NULL;
2020
  }
L
Liu Jicong 已提交
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2059 2060 2061
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2062
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2063 2064 2065 2066 2067 2068
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2069 2070 2071
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2072
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2073 2074 2075 2076 2077
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

2078
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2079
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2080 2081 2082 2083 2084
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2085 2086
  pInfo->vnode = pHandle->vnode;

2087 2088
  pInfo->sContext = pHandle->sContext;
  pOperator->name = "RawStreamScanOperator";
L
Liu Jicong 已提交
2089 2090
  //  pOperator->blocking = false;
  //  pOperator->status = OP_NOT_OPENED;
2091 2092 2093
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;

L
Liu Jicong 已提交
2094
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
2095
  return pOperator;
L
Liu Jicong 已提交
2096 2097
}

2098
static void destroyStreamScanOperatorInfo(void* param) {
2099 2100 2101
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
    STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
2102
    destroyTableScanOperatorInfo(pTableScanInfo);
5
54liuyao 已提交
2103
    taosMemoryFreeClear(pStreamScan->pTableScanOp);
2104 2105 2106 2107 2108 2109 2110
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
  if (pStreamScan->pColMatchInfo) {
    taosArrayDestroy(pStreamScan->pColMatchInfo);
  }
C
Cary Xu 已提交
2111 2112
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2113
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2114
  }
C
Cary Xu 已提交
2115

L
Liu Jicong 已提交
2116
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2117 2118 2119 2120
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2121
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2122 2123 2124 2125
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2126
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2127
                                            SExecTaskInfo* pTaskInfo) {
2128 2129
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2130

H
Haojun Liao 已提交
2131 2132
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
2133
    goto _error;
H
Haojun Liao 已提交
2134 2135
  }

2136
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2137
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2138

2139
  pInfo->pTagCond = pTagCond;
2140
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2141

2142
  int32_t numOfCols = 0;
2143
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
2144 2145 2146

  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2147
  for (int32_t i = 0; i < numOfOutput; ++i) {
2148 2149 2150
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
2151
    taosArrayPush(pColIds, &colId);
2152
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
5
54liuyao 已提交
2153 2154
      pInfo->primaryTsIndex = id->targetSlotId;
    }
H
Haojun Liao 已提交
2155 2156
  }

L
Liu Jicong 已提交
2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

H
Haojun Liao 已提交
2170 2171
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2172 2173
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2174 2175
  }

5
54liuyao 已提交
2176
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2177
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2178
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2179
    if (pHandle->version > 0) {
L
Liu Jicong 已提交
2180
      pTSInfo->cond.endVersion = pHandle->version;
2181
    }
L
Liu Jicong 已提交
2182 2183

    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
2184
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2185 2186 2187
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
      pTSInfo->dataReader = NULL;
      if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
L
Liu Jicong 已提交
2188 2189
        ASSERT(0);
      }
L
Liu Jicong 已提交
2190 2191
    }

L
Liu Jicong 已提交
2192 2193 2194 2195
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2196
    } else {
L
Liu Jicong 已提交
2197 2198
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2199 2200
    }

2201
    pInfo->pUpdateInfo = NULL;
2202
    pInfo->pTableScanOp = pTableScanOp;
2203 2204 2205
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2206

L
Liu Jicong 已提交
2207 2208
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2209
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2210

L
Liu Jicong 已提交
2211
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2212
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
L
Liu Jicong 已提交
2213
    SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
L
Liu Jicong 已提交
2214
    int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2215 2216 2217 2218 2219
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
L
Liu Jicong 已提交
2220
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2221 2222
  } else {
    taosArrayDestroy(pColIds);
5
54liuyao 已提交
2223 2224
  }

2225 2226 2227 2228 2229
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

2230
  pInfo->pRes = createResDataBlock(pDescNode);
2231
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2232 2233
  pInfo->pCondition = pScanPhyNode->node.pConditions;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2234
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2235
  pInfo->groupId = 0;
2236
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2237
  pInfo->pStreamScanOp = pOperator;
2238
  pInfo->deleteDataIndex = 0;
2239
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2240
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2241
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2242
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2243
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2244

2245
  pOperator->name = "StreamScanOperator";
L
Liu Jicong 已提交
2246
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
2247 2248 2249
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2250
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
2251
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2252

L
Liu Jicong 已提交
2253 2254 2255
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
2256

H
Haojun Liao 已提交
2257
  return pOperator;
2258

L
Liu Jicong 已提交
2259
_error:
2260 2261 2262
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2263 2264
}

2265
static void destroySysScanOperator(void* param) {
H
Haojun Liao 已提交
2266 2267 2268 2269
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

2270
  const char* name = tNameGetTableName(&pInfo->name);
D
dapan1121 已提交
2271 2272
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
      strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
2273
    metaCloseTbCursor(pInfo->pCur);
2274
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
2275
  }
H
Haojun Liao 已提交
2276 2277

  taosArrayDestroy(pInfo->scanCols);
2278
  taosMemoryFreeClear(pInfo->pUser);
D
dapan1121 已提交
2279 2280

  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2281 2282
}

X
Xiaoyu Wang 已提交
2283
static int32_t getSysTableDbNameColId(const char* pTable) {
D
dapan1121 已提交
2284
  // if (0 == strcmp(TSDB_INS_TABLE_INDEXES, pTable)) {
X
Xiaoyu Wang 已提交
2285 2286
  //   return 1;
  // }
X
Xiaoyu Wang 已提交
2287 2288 2289
  return TSDB_INS_USER_STABLES_DBNAME_COLID;
}

H
Haojun Liao 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
2311
      if (getSysTableDbNameColId(node->tableName) == node->colId) {
H
Haojun Liao 已提交
2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
2328
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
2329 2330 2331 2332 2333 2334 2335
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

2336
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
2337 2338 2339
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
2340
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
2341 2342
}

D
dapan1121 已提交
2343
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
2344 2345 2346 2347 2348 2349 2350
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
2351 2352 2353
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
2354 2355 2356 2357 2358
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
2359
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2360 2361 2362 2363 2364 2365 2366
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

H
Haojun Liao 已提交
2367
  doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
H
Haojun Liao 已提交
2368 2369 2370
  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

2371
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
L
Liu Jicong 已提交
2372 2373
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
2374 2375 2376
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
2377
  for (int32_t i = 0; i < size; ++i) {
2378
    if (strcmp(pMeta[i].name, tableName) == 0) {
2379 2380 2381 2382
      index = i;
      break;
    }
  }
2383

2384
  SSDataBlock* pBlock = createDataBlock();
L
Liu Jicong 已提交
2385
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
L
Liu Jicong 已提交
2386 2387
    SColumnInfoData colInfoData =
        createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
2388
    blockDataAppendColInfo(pBlock, &colInfoData);
2389 2390
  }

2391 2392 2393
  return pBlock;
}

2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474
int32_t convertTagDataToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len) {
  int32_t n = 0;

  switch (type) {
    case TSDB_DATA_TYPE_NULL:
      n = sprintf(str, "null");
      break;

    case TSDB_DATA_TYPE_BOOL:
      n = sprintf(str, (*(int8_t*)buf) ? "true" : "false");
      break;

    case TSDB_DATA_TYPE_TINYINT:
      n = sprintf(str, "%d", *(int8_t*)buf);
      break;

    case TSDB_DATA_TYPE_SMALLINT:
      n = sprintf(str, "%d", *(int16_t*)buf);
      break;

    case TSDB_DATA_TYPE_INT:
      n = sprintf(str, "%d", *(int32_t*)buf);
      break;

    case TSDB_DATA_TYPE_BIGINT:
    case TSDB_DATA_TYPE_TIMESTAMP:
      n = sprintf(str, "%" PRId64, *(int64_t*)buf);
      break;

    case TSDB_DATA_TYPE_FLOAT:
      n = sprintf(str, "%.5f", GET_FLOAT_VAL(buf));
      break;

    case TSDB_DATA_TYPE_DOUBLE:
      n = sprintf(str, "%.9f", GET_DOUBLE_VAL(buf));
      break;

    case TSDB_DATA_TYPE_BINARY:
      if (bufSize < 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

      memcpy(str, buf, bufSize);
      n = bufSize;
      break;
    case TSDB_DATA_TYPE_NCHAR:
      if (bufSize < 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str);
      if (length <= 0) {
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
      n = length;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      n = sprintf(str, "%u", *(uint8_t*)buf);
      break;

    case TSDB_DATA_TYPE_USMALLINT:
      n = sprintf(str, "%u", *(uint16_t*)buf);
      break;

    case TSDB_DATA_TYPE_UINT:
      n = sprintf(str, "%u", *(uint32_t*)buf);
      break;

    case TSDB_DATA_TYPE_UBIGINT:
      n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
      break;

    default:
      return TSDB_CODE_TSC_INVALID_VALUE;
  }

  if (len) *len = n;

  return TSDB_CODE_SUCCESS;
}

2475 2476 2477 2478 2479 2480 2481
static bool sysTableIsOperatorCondOnOneTable(SNode* pCond, char* condTable) {
  SOperatorNode* node = (SOperatorNode*)pCond;
  if (node->opType == OP_TYPE_EQUAL) {
    if (nodeType(node->pLeft) == QUERY_NODE_COLUMN &&
        strcasecmp(nodesGetNameFromColumnNode(node->pLeft), "table_name") == 0 &&
        nodeType(node->pRight) == QUERY_NODE_VALUE) {
      SValueNode* pValue = (SValueNode*)node->pRight;
2482 2483 2484 2485
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NCHAR || pValue->node.resType.type == TSDB_DATA_TYPE_VARCHAR ||
          pValue->node.resType.type == TSDB_DATA_TYPE_BINARY) {
        char* value = nodesGetValueFromNode(pValue);
        strncpy(condTable, varDataVal(value), TSDB_TABLE_NAME_LEN);
2486 2487 2488 2489 2490 2491 2492 2493
        return true;
      }
    }
  }
  return false;
}

static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
S
slzhou 已提交
2494 2495 2496
  if (pCond == NULL) {
    return false;
  }
2497 2498 2499
  if (nodeType(pCond) == QUERY_NODE_LOGIC_CONDITION) {
    SLogicConditionNode* node = (SLogicConditionNode*)pCond;
    if (LOGIC_COND_TYPE_AND == node->condType) {
S
slzhou 已提交
2500 2501 2502 2503
      SNode* pChild = NULL;
      FOREACH(pChild, node->pParameterList) {
        if (QUERY_NODE_OPERATOR == nodeType(pChild) && sysTableIsOperatorCondOnOneTable(pChild, condTable)) {
          return true;
2504 2505 2506 2507
        }
      }
    }
  }
S
slzhou 已提交
2508

2509 2510 2511
  if (QUERY_NODE_OPERATOR == nodeType(pCond)) {
    return sysTableIsOperatorCondOnOneTable(pCond, condTable);
  }
S
slzhou 已提交
2512

2513 2514 2515
  return false;
}

S
shenglian zhou 已提交
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  blockDataCleanup(pInfo->pRes);
  int32_t numOfRows = 0;

2526 2527 2528
  SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TAGS);
  blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);

S
shenglian zhou 已提交
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539
  const char* db = NULL;
  int32_t     vgId = 0;
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);

  SName sn = {0};
  char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);

  tNameGetDbName(&sn, varDataVal(dbname));
  varDataSetLen(dbname, strlen(varDataVal(dbname)));

2540
  char condTableName[TSDB_TABLE_NAME_LEN] = {0};
S
slzhou 已提交
2541 2542
  // optimize when sql like where table_name='tablename' and xxx.
  if (pInfo->pCondition && sysTableIsCondOnOneTable(pInfo->pCondition, condTableName)) {
2543 2544 2545
    char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(tableName, condTableName);

2546 2547 2548 2549 2550 2551 2552 2553 2554 2555
    SMetaReader smrChildTable = {0};
    metaReaderInit(&smrChildTable, pInfo->readHandle.meta, 0);
    metaGetTableEntryByName(&smrChildTable, condTableName);
    if (smrChildTable.me.type != TSDB_CHILD_TABLE) {
      metaReaderClear(&smrChildTable);
      blockDataDestroy(dataBlock);
      pInfo->loadInfo.totalRows = 0;
      return NULL;
    }
    SMetaReader smrSuperTable = {0};
2556
    metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, META_READER_NOLOCK);
2557 2558 2559 2560
    metaGetTableEntryByUid(&smrSuperTable, smrChildTable.me.ctbEntry.suid);
    sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &smrChildTable, dbname, tableName, &numOfRows, dataBlock);
    metaReaderClear(&smrSuperTable);
    metaReaderClear(&smrChildTable);
2561 2562 2563 2564 2565 2566
    if (numOfRows > 0) {
      relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
      numOfRows = 0;
    }
    blockDataDestroy(dataBlock);
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
2567
    doSetOperatorCompleted(pOperator);
2568 2569
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  }
S
shenglian zhou 已提交
2570 2571

  int32_t ret = 0;
2572 2573 2574 2575
  if (pInfo->pCur == NULL) {
    pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
  }

S
shenglian zhou 已提交
2576
  while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
2577 2578 2579
    if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
      continue;
    }
S
shenglian zhou 已提交
2580

2581 2582
    char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
S
shenglian zhou 已提交
2583

2584 2585
    SMetaReader smrSuperTable = {0};
    metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
2586
    uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
2587
    int32_t  code = metaGetTableEntryByUid(&smrSuperTable, suid);
2588 2589 2590
    if (code != TSDB_CODE_SUCCESS) {
      qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
2591
      metaReaderClear(&smrSuperTable);
2592 2593
      metaCloseTbCursor(pInfo->pCur);
      pInfo->pCur = NULL;
2594
      T_LONG_JMP(pTaskInfo->env, terrno);
2595
    }
S
shenglian zhou 已提交
2596

2597
    sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock);
2598

2599
    metaReaderClear(&smrSuperTable);
S
shenglian zhou 已提交
2600

2601
    if (numOfRows >= pOperator->resultInfo.capacity) {
2602
      relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
2603 2604 2605 2606 2607
      numOfRows = 0;

      if (pInfo->pRes->info.rows > 0) {
        break;
      }
S
shenglian zhou 已提交
2608 2609 2610
    }
  }

2611
  if (numOfRows > 0) {
2612
    relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
2613 2614 2615
    numOfRows = 0;
  }

2616
  blockDataDestroy(dataBlock);
S
shenglian zhou 已提交
2617 2618 2619 2620 2621 2622 2623 2624 2625 2626
  if (ret != 0) {
    metaCloseTbCursor(pInfo->pCur);
    pInfo->pCur = NULL;
    doSetOperatorCompleted(pOperator);
  }

  pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
  return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}

2627 2628 2629 2630 2631 2632 2633 2634 2635 2636
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) {
  dataBlock->info.rows = numOfRows;
  pInfo->pRes->info.rows = numOfRows;

  relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false);
  doFilterResult(pInfo);

  blockDataCleanup(dataBlock);
}

2637 2638 2639
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
                                                SMetaReader* smrChildTable, const char* dbname, const char* tableName,
                                                int32_t* pNumOfRows, const SSDataBlock* dataBlock) {
2640
  char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2641
  STR_TO_VARSTR(stableName, (*smrSuperTable).me.name);
2642 2643 2644

  int32_t numOfRows = *pNumOfRows;

2645
  int32_t numOfTags = (*smrSuperTable).me.stbEntry.schemaTag.nCols;
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662
  for (int32_t i = 0; i < numOfTags; ++i) {
    SColumnInfoData* pColInfoData = NULL;

    // table name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
    colDataAppend(pColInfoData, numOfRows, tableName, false);

    // database name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, dbname, false);

    // super table name
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
    colDataAppend(pColInfoData, numOfRows, stableName, false);

    // tag name
    char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2663
    STR_TO_VARSTR(tagName, (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].name);
2664 2665 2666 2667
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, tagName, false);

    // tag type
2668
    int8_t tagType = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].type;
2669 2670 2671 2672 2673
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
    char tagTypeStr[VARSTR_HEADER_SIZE + 32];
    int  tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
    if (tagType == TSDB_DATA_TYPE_VARCHAR) {
      tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
2674
                            (int32_t)((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
2675
    } else if (tagType == TSDB_DATA_TYPE_NCHAR) {
2676 2677 2678
      tagTypeLen += sprintf(
          varDataVal(tagTypeStr) + tagTypeLen, "(%d)",
          (int32_t)(((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
2679 2680 2681 2682 2683
    }
    varDataSetLen(tagTypeStr, tagTypeLen);
    colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);

    STagVal tagVal = {0};
2684
    tagVal.cid = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].colId;
2685 2686 2687 2688
    char*    tagData = NULL;
    uint32_t tagLen = 0;

    if (tagType == TSDB_DATA_TYPE_JSON) {
2689
      tagData = (char*)smrChildTable->me.ctbEntry.pTags;
2690
    } else {
2691
      bool exist = tTagGet((STag*)smrChildTable->me.ctbEntry.pTags, &tagVal);
2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731
      if (exist) {
        if (IS_VAR_DATA_TYPE(tagType)) {
          tagData = (char*)tagVal.pData;
          tagLen = tagVal.nData;
        } else {
          tagData = (char*)&tagVal.i64;
          tagLen = tDataTypes[tagType].bytes;
        }
      }
    }

    char* tagVarChar = NULL;
    if (tagData != NULL) {
      if (tagType == TSDB_DATA_TYPE_JSON) {
        char* tagJson = parseTagDatatoJson(tagData);
        tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
        memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
        varDataSetLen(tagVarChar, strlen(tagJson));
        taosMemoryFree(tagJson);
      } else {
        int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
                                                    : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
        tagVarChar = taosMemoryMalloc(bufSize);
        int32_t len = -1;
        convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
        varDataSetLen(tagVarChar, len);
      }
    }
    pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
    colDataAppend(pColInfoData, numOfRows, tagVarChar,
                  (tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
    taosMemoryFree(tagVarChar);
    ++numOfRows;
  }

  *pNumOfRows = numOfRows;

  return TSDB_CODE_SUCCESS;
}

2732
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2733 2734
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
2735 2736 2737
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
H
Haojun Liao 已提交
2738

2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750
  // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
  if (pInfo->readHandle.mnd != NULL) {
    buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);

    doFilterResult(pInfo);
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;

    doSetOperatorCompleted(pOperator);
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  } else {
    if (pInfo->pCur == NULL) {
      pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
2751 2752
    }

2753 2754
    blockDataCleanup(pInfo->pRes);
    int32_t numOfRows = 0;
2755

2756 2757 2758
    const char* db = NULL;
    int32_t     vgId = 0;
    vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
2759

2760 2761 2762
    SName sn = {0};
    char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
2763

2764 2765
    tNameGetDbName(&sn, varDataVal(dbname));
    varDataSetLen(dbname, strlen(varDataVal(dbname)));
2766

D
dapan1121 已提交
2767
    SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
2768
    blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
2769

2770
    char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2771

2772 2773 2774
    int32_t ret = 0;
    while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
      STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
2775

2776 2777 2778
      // table name
      SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
      colDataAppend(pColInfoData, numOfRows, n, false);
2779

2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795
      // database name
      pColInfoData = taosArrayGet(p->pDataBlock, 1);
      colDataAppend(pColInfoData, numOfRows, dbname, false);

      // vgId
      pColInfoData = taosArrayGet(p->pDataBlock, 6);
      colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);

      int32_t tableType = pInfo->pCur->mr.me.type;
      if (tableType == TSDB_CHILD_TABLE) {
        // create time
        int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
        pColInfoData = taosArrayGet(p->pDataBlock, 2);
        colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

        SMetaReader mr = {0};
2796
        metaReaderInit(&mr, pInfo->readHandle.meta, META_READER_NOLOCK);
2797 2798 2799 2800

        uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
        int32_t  code = metaGetTableEntryByUid(&mr, suid);
        if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
2801 2802
          qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pInfo->pCur->mr.me.name,
                 suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
2803 2804 2805
          metaReaderClear(&mr);
          metaCloseTbCursor(pInfo->pCur);
          pInfo->pCur = NULL;
2806
          T_LONG_JMP(pTaskInfo->env, terrno);
2807
        }
2808

2809 2810 2811
        // number of columns
        pColInfoData = taosArrayGet(p->pDataBlock, 3);
        colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
2812

2813 2814 2815
        // super table name
        STR_TO_VARSTR(n, mr.me.name);
        pColInfoData = taosArrayGet(p->pDataBlock, 4);
2816
        colDataAppend(pColInfoData, numOfRows, n, false);
2817
        metaReaderClear(&mr);
2818

2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829
        // table comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
        if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
          char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
          char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, "");
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else {
2830 2831
          colDataAppendNULL(pColInfoData, numOfRows);
        }
2832

2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853
        // uid
        pColInfoData = taosArrayGet(p->pDataBlock, 5);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

        // ttl
        pColInfoData = taosArrayGet(p->pDataBlock, 7);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

        STR_TO_VARSTR(n, "CHILD_TABLE");
      } else if (tableType == TSDB_NORMAL_TABLE) {
        // create time
        pColInfoData = taosArrayGet(p->pDataBlock, 2);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

        // number of columns
        pColInfoData = taosArrayGet(p->pDataBlock, 3);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);

        // super table name
        pColInfoData = taosArrayGet(p->pDataBlock, 4);
        colDataAppendNULL(pColInfoData, numOfRows);
2854

2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866
        // table comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
        if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
          char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
          char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
          STR_TO_VARSTR(comment, "");
          colDataAppend(pColInfoData, numOfRows, comment, false);
        } else {
          colDataAppendNULL(pColInfoData, numOfRows);
2867
        }
2868 2869 2870 2871 2872 2873 2874 2875 2876 2877

        // uid
        pColInfoData = taosArrayGet(p->pDataBlock, 5);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

        // ttl
        pColInfoData = taosArrayGet(p->pDataBlock, 7);
        colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

        STR_TO_VARSTR(n, "NORMAL_TABLE");
H
Haojun Liao 已提交
2878 2879
      }

2880 2881 2882 2883
      pColInfoData = taosArrayGet(p->pDataBlock, 9);
      colDataAppend(pColInfoData, numOfRows, n, false);

      if (++numOfRows >= pOperator->resultInfo.capacity) {
2884 2885 2886 2887 2888 2889 2890
        p->info.rows = numOfRows;
        pInfo->pRes->info.rows = numOfRows;

        relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
        doFilterResult(pInfo);

        blockDataCleanup(p);
2891 2892
        numOfRows = 0;

2893
        if (pInfo->pRes->info.rows > 0) {
2894
          break;
2895
        }
2896
      }
2897
    }
2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909

    if (numOfRows > 0) {
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;

      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
      doFilterResult(pInfo);

      blockDataCleanup(p);
      numOfRows = 0;
    }

2910
    blockDataDestroy(p);
2911

2912 2913 2914 2915 2916 2917
    // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
    if (ret != 0) {
      metaCloseTbCursor(pInfo->pCur);
      pInfo->pCur = NULL;
      doSetOperatorCompleted(pOperator);
    }
H
Haojun Liao 已提交
2918

2919 2920 2921 2922 2923
    pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
  }
}

2924 2925 2926 2927 2928 2929 2930 2931
static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) {
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  pInfo->pRes->info.rows = 0;
D
dapan1121 已提交
2932
  pOperator->status = OP_EXEC_DONE;
2933 2934 2935 2936 2937

  pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
  return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}

2938 2939 2940 2941
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2942
  char               dbName[TSDB_DB_NAME_LEN] = {0};
2943 2944

  const char* name = tNameGetTableName(&pInfo->name);
D
dapan1121 已提交
2945 2946 2947 2948
  if (pInfo->showRewrite) {
    getDBNameFromCondition(pInfo->pCondition, dbName);
    sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
  }
H
Hongze Cheng 已提交
2949

D
dapan1121 已提交
2950
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
2951
    return sysTableScanUserTables(pOperator);
D
dapan1121 已提交
2952
  } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
2953
    return sysTableScanUserTags(pOperator);
L
Liu Jicong 已提交
2954 2955
  } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
             IS_SYS_DBNAME(dbName)) {
2956
    return sysTableScanUserSTables(pOperator);
H
Haojun Liao 已提交
2957 2958 2959 2960 2961
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

2962 2963 2964
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
2965
      strcpy(pInfo->req.user, pInfo->pUser);
H
Haojun Liao 已提交
2966

2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
2978

L
Liu Jicong 已提交
2979 2980
      int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
                                                                                : TDMT_MND_SYSTABLE_RETRIEVE;
D
dapan1121 已提交
2981

2982 2983 2984
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
D
dapan1121 已提交
2985
      pMsgSendInfo->msgType = msgType;
2986
      pMsgSendInfo->fp = loadSysTableCallback;
D
dapan1121 已提交
2987
      pMsgSendInfo->requestId = pTaskInfo->id.queryId;
H
Haojun Liao 已提交
2988

2989
      int64_t transporterId = 0;
2990 2991
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
2992
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
2993

2994 2995 2996 2997 2998
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
2999

3000 3001
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
3002

3003 3004
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
3005
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
3006
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
3007

3008
        if (pRsp->numOfRows == 0) {
H
Haojun Liao 已提交
3009
          taosMemoryFree(pRsp);
3010 3011 3012
          return NULL;
        }
      }
H
Haojun Liao 已提交
3013

3014
      char* pStart = pRsp->data;
H
Haojun Liao 已提交
3015
      extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->scanCols, &pStart);
3016
      updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
H
Haojun Liao 已提交
3017

3018 3019
      // todo log the filter info
      doFilterResult(pInfo);
H
Haojun Liao 已提交
3020
      taosMemoryFree(pRsp);
3021 3022
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
D
dapan1121 已提交
3023 3024
      } else if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
3025
      }
3026
    }
H
Haojun Liao 已提交
3027 3028 3029
  }
}

3030
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
D
dapan1121 已提交
3031
  SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
3032
  blockDataEnsureCapacity(p, capacity);
3033

L
Liu Jicong 已提交
3034
  size_t               size = 0;
3035 3036 3037
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
3038
  p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
3039 3040

  getPerfDbMeta(&pSysDbTableMeta, &size);
3041
  p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
3042 3043

  pInfo->pRes->info.rows = p->info.rows;
3044
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
3045 3046 3047
  blockDataDestroy(p);

  return pInfo->pRes->info.rows;
3048 3049
}

3050
int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
L
Liu Jicong 已提交
3051 3052
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
3053 3054
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
3055
  for (int32_t i = 0; i < size; ++i) {
3056
    const SSysTableMeta* pm = &pSysDbTableMeta[i];
3057 3058 3059
    if (!sysInfo && pm->sysInfo) {
      continue;
    }
3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
3079
    for (int32_t j = 4; j <= 8; ++j) {
3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

3095
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
3096
                                              const char* pUser, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3097 3098 3099
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
3100
    goto _error;
H
Haojun Liao 已提交
3101 3102
  }

3103 3104 3105
  SScanPhysiNode* pScanNode = &pScanPhyNode->scan;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
3106
  SSDataBlock*        pResBlock = createResDataBlock(pDescNode);
3107 3108

  int32_t num = 0;
3109
  SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
3110

3111 3112
  pInfo->accountId = pScanPhyNode->accountId;
  pInfo->pUser = taosMemoryStrDup((void*)pUser);
3113
  pInfo->sysInfo = pScanPhyNode->sysInfo;
3114
  pInfo->showRewrite = pScanPhyNode->showRewrite;
3115 3116 3117
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pScanNode->node.pConditions;
  pInfo->scanCols = colList;
3118

3119
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3120

3121
  tNameAssign(&pInfo->name, &pScanNode->tableName);
3122
  const char* name = tNameGetTableName(&pInfo->name);
3123

D
dapan1121 已提交
3124 3125
  if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
      strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
3126
    pInfo->readHandle = *(SReadHandle*)readHandle;
3127
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
3128 3129
  } else {
    tsem_init(&pInfo->ready, 0, 0);
3130
    pInfo->epSet = pScanPhyNode->mgmtEpSet;
3131
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
3132 3133
  }

3134
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
3135
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
3136 3137 3138
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
3139
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
3140
  pOperator->pTaskInfo = pTaskInfo;
3141

L
Liu Jicong 已提交
3142 3143
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
H
Haojun Liao 已提交
3144 3145

  return pOperator;
3146

3147
_error:
3148 3149 3150 3151
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
3152
}
H
Haojun Liao 已提交
3153

3154
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
3155 3156 3157 3158
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3159 3160 3161
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
3162
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
3163
  SSDataBlock*  pRes = pInfo->pRes;
3164
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
3165

wmmhello's avatar
wmmhello 已提交
3166 3167
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
3168 3169 3170 3171
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

3172 3173 3174
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
3175
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
3176

wmmhello's avatar
wmmhello 已提交
3177 3178
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
L
Liu Jicong 已提交
3179
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
3180
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
3181
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3182 3183
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3184
      metaReaderClear(&mr);
3185
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
3186
    }
H
Haojun Liao 已提交
3187

3188
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
3189 3190 3191 3192 3193 3194
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
3195
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
3196 3197
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
3198
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
3199

3200 3201 3202 3203
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
3204 3205
          data = (char*)p;
        }
L
Liu Jicong 已提交
3206 3207
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
3208

3209 3210
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
3211
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
3212
        }
H
Haojun Liao 已提交
3213 3214 3215
      }
    }

3216
    count += 1;
wmmhello's avatar
wmmhello 已提交
3217
    if (++pInfo->curPos >= size) {
3218
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3219 3220 3221
    }
  }

3222 3223
  metaReaderClear(&mr);

3224
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
3225
  if (pOperator->status == OP_EXEC_DONE) {
3226
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
3227 3228 3229
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
3230
  pOperator->resultInfo.totalRows += count;
3231

3232
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
3233 3234
}

3235
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
3236 3237
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
S
shenglian zhou 已提交
3238
  taosArrayDestroy(pInfo->pColMatchInfo);
D
dapan1121 已提交
3239
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3240 3241
}

3242 3243
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3244
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
3245 3246 3247 3248 3249
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3250 3251
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

3252
  int32_t    num = 0;
3253 3254
  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
L
Liu Jicong 已提交
3255
  SArray*    colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
3256

3257 3258 3259 3260
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3261

3262 3263 3264 3265 3266
  pInfo->pTableList = pTableListInfo;
  pInfo->pColMatchInfo = colList;
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
3267

3268
  pOperator->name = "TagScanOperator";
3269
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
3270

3271 3272 3273 3274
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3275

3276
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3277 3278
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

3279 3280
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
3281 3282

  return pOperator;
3283

3284
_error:
H
Haojun Liao 已提交
3285 3286 3287 3288 3289
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
3290

H
Haojun Liao 已提交
3291
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
3292 3293
                                STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                const char* idStr) {
H
Haojun Liao 已提交
3294 3295
  int64_t st = taosGetTimestampUs();

3296
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
3297
  if (code != TSDB_CODE_SUCCESS) {
3298
    qError("failed to getTableList, code: %s", tstrerror(code));
3299
    return code;
3300 3301
  }

H
Haojun Liao 已提交
3302
  int64_t st1 = taosGetTimestampUs();
L
Liu Jicong 已提交
3303
  qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
H
Haojun Liao 已提交
3304

3305
  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
3306
    qDebug("no table qualified for query, %s" PRIx64, idStr);
3307 3308
    return TSDB_CODE_SUCCESS;
  }
3309

H
Haojun Liao 已提交
3310 3311
  pTableListInfo->needSortTableByGroupId = groupSort;
  code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags);
3312
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3313
    return code;
3314 3315
  }

H
Haojun Liao 已提交
3316
  int64_t st2 = taosGetTimestampUs();
L
Liu Jicong 已提交
3317
  qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
H
Haojun Liao 已提交
3318

3319 3320 3321
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
3322
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
H
Haojun Liao 已提交
3323
                                  int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
S
slzhou 已提交
3324 3325 3326 3327
  for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
    SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));

H
Haojun Liao 已提交
3328 3329
    STsdbReader* pReader = NULL;
    tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
S
slzhou 已提交
3330 3331 3332 3333 3334 3335 3336 3337
    taosArrayPush(arrayReader, &pReader);

    taosArrayDestroy(subTableList);
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3338
// todo refactor
3339 3340
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
                                         int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
L
Liu Jicong 已提交
3341
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
S
shenglian zhou 已提交
3342
  STableMergeScanInfo* pInfo = pOperator->info;
3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368

  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;

  pCost->totalBlocks += 1;
  pCost->totalRows += pBlock->info.rows;

  *status = pInfo->dataBlockLoadFlag;
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
  taosMemoryFreeClear(pBlock->pBlockAgg);

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

    // clear all data in pBlock that are set when handing the previous block
3369
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
3370 3371 3372 3373 3374 3375 3376 3377 3378 3379
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

    bool             allColumnsHaveAgg = true;
    SColumnDataAgg** pColAgg = NULL;
H
Hongze Cheng 已提交
3380
    STsdbReader*     reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3381
    tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
3382 3383

    if (allColumnsHaveAgg == true) {
3384
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408

      // todo create this buffer during creating operator
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }

      return TSDB_CODE_SUCCESS;
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
    }
  }

  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);

  // todo filter data block according to the block sma data firstly
#if 0
H
Haojun Liao 已提交
3409
  if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif

  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;

H
Hongze Cheng 已提交
3421
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3422 3423 3424 3425 3426
  SArray*      pCols = tsdbRetrieveDataBlock(reader, NULL);
  if (pCols == NULL) {
    return terrno;
  }

3427
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
3428 3429

  // currently only the tbname pseudo column
S
shenglian zhou 已提交
3430 3431 3432
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
                                          pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3433
    if (code != TSDB_CODE_SUCCESS) {
3434
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
3435
    }
3436 3437
  }

3438 3439
  if (pTableScanInfo->pFilterNode != NULL) {
    int64_t st = taosGetTimestampMs();
H
Haojun Liao 已提交
3440
    doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL);
3441

3442 3443
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
3444

3445 3446 3447 3448 3449 3450 3451
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
3452 3453 3454 3455 3456 3457 3458 3459
  }

  return TSDB_CODE_SUCCESS;
}

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
3460
  SSDataBlock*   inputBlock;
3461 3462 3463 3464 3465 3466
} STableMergeScanSortSourceParam;

static SSDataBlock* getTableDataBlock(void* param) {
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  int32_t                         readerIdx = source->readerIdx;
3467
  SSDataBlock*                    pBlock = source->inputBlock;
3468 3469 3470 3471
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  int64_t st = taosGetTimestampUs();

3472 3473
  blockDataCleanup(pBlock);

H
Hongze Cheng 已提交
3474
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
3475 3476
  while (tsdbNextDataBlock(reader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
3477
      T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
3478 3479 3480 3481 3482 3483 3484 3485
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

3486 3487 3488 3489
    blockDataCleanup(pBlock);
    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(reader, &binfo);

3490
    blockDataEnsureCapacity(pBlock, binfo.rows);
3491 3492 3493 3494
    pBlock->info.type = binfo.type;
    pBlock->info.uid = binfo.uid;
    pBlock->info.window = binfo.window;
    pBlock->info.rows = binfo.rows;
3495 3496 3497 3498 3499

    uint32_t status = 0;
    int32_t  code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
3500
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

    uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    return pBlock;
  }
  return NULL;
}

3521 3522 3523 3524 3525 3526 3527 3528 3529
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
    SColMatchInfo* colInfo = taosArrayGet(colMatchInfo, i);
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
      tsTargetSlotId = colInfo->targetSlotId;
    }
  }

3530 3531 3532
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
3533
  bi.slotId = tsTargetSlotId;
3534 3535 3536 3537 3538 3539 3540
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

3541
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
3542 3543 3544
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555
  {
    size_t  tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
    int32_t i = pInfo->tableStartIndex + 1;
    for (; i < tableListSize; ++i) {
      STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
3556

S
slzhou 已提交
3557 3558
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
3559

S
slzhou 已提交
3560
  STableListInfo* tableListInfo = pInfo->tableListInfo;
3561
  pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
S
slzhou 已提交
3562
  createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
3563
                            pInfo->dataReaders, GET_TASKID(pTaskInfo));
3564

3565 3566
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
3567
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
3568
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
3569 3570
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
3571 3572 3573 3574

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);

  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
3575 3576 3577 3578 3579 3580 3581 3582
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
  }

3583 3584
  for (int32_t i = 0; i < numReaders; ++i) {
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
3585
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
3586 3587 3588 3589 3590 3591 3592
    ps->param = param;
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
3593
    T_LONG_JMP(pTaskInfo->env, terrno);
3594 3595
  }

3596 3597 3598 3599 3600 3601 3602
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

3603 3604
  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);

3605 3606 3607 3608 3609 3610 3611
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

3612 3613 3614 3615
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
3616 3617
  taosArrayClear(pInfo->sortSourceParams);

3618 3619 3620
  tsortDestroySortHandle(pInfo->pSortHandle);

  for (int32_t i = 0; i < numReaders; ++i) {
H
Haojun Liao 已提交
3621 3622
    STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
    tsdbReaderClose(reader);
3623
  }
3624 3625
  taosArrayDestroy(pInfo->dataReaders);
  pInfo->dataReaders = NULL;
3626 3627 3628
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
3629 3630
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
3631 3632 3633
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

3634 3635
  blockDataCleanup(pResBlock);
  blockDataEnsureCapacity(pResBlock, capacity);
3636 3637

  while (1) {
3638
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
3639 3640 3641 3642
    if (pTupleHandle == NULL) {
      break;
    }

3643 3644
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
3645 3646 3647 3648
      break;
    }
  }

3649 3650
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
3663
    T_LONG_JMP(pTaskInfo->env, code);
3664
  }
S
slzhou 已提交
3665 3666 3667
  size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
3668

S
slzhou 已提交
3669
    if (tableListSize == 0) {
3670 3671 3672
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
S
slzhou 已提交
3673 3674
    pInfo->tableStartIndex = 0;
    pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
3675 3676
    startGroupTableMergeScan(pOperator);
  }
S
slzhou 已提交
3677 3678
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
3679 3680
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
        doSetOperatorCompleted(pOperator);
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
      pInfo->groupId =
          ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
3696 3697
  }

3698 3699 3700
  return pBlock;
}

3701
void destroyTableMergeScanOperatorInfo(void* param) {
3702
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
3703
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
3704
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
3705 3706

  for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
H
Hongze Cheng 已提交
3707
    STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
H
refact  
Hongze Cheng 已提交
3708
    tsdbReaderClose(reader);
3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719
  }
  taosArrayDestroy(pTableScanInfo->dataReaders);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
3720
  cleanupExprSupp(&pTableScanInfo->pseudoSup);
L
Liu Jicong 已提交
3721

3722
  taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset);
D
dapan1121 已提交
3723
  taosMemoryFreeClear(param);
3724 3725
}

3726 3727
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
L
Liu Jicong 已提交
3728
  SSortExecInfo          sortExecInfo;
3729 3730
} STableMergeScanExecInfo;

3731 3732
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3733 3734
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3735
  STableMergeScanInfo*     pInfo = pOptr->info;
3736
  execInfo->blockRecorder = pInfo->readRecorder;
3737
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3738 3739 3740

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3741

3742 3743 3744
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
3745 3746 3747
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
  const STableKeyInfo* info1 = p1;
  const STableKeyInfo* info2 = p2;
3748 3749 3750 3751 3752 3753 3754
  if (info1->groupId < info2->groupId) {
    return -1;
  } else if (info1->groupId > info2->groupId) {
    return 1;
  } else {
    return 0;
  }
S
slzhou 已提交
3755 3756
}

3757
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
3758
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
3759 3760 3761 3762 3763
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3764
  if (pTableScanNode->pGroupTags) {
S
slzhou 已提交
3765 3766
    taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
  }
3767 3768 3769 3770

  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
L
Liu Jicong 已提交
3771
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
3772 3773 3774 3775 3776 3777 3778

  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
3779 3780 3781
    SExprSupp* pSup = &pInfo->pseudoSup;
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3782 3783 3784 3785
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

L
Liu Jicong 已提交
3786 3787
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
3788
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3789 3790 3791
  pInfo->sample.seed = taosGetTimestampSec();
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
3792
  pInfo->tableListInfo = pTableListInfo;
L
Liu Jicong 已提交
3793 3794
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
3795 3796

  pInfo->pResBlock = createResDataBlock(pDescNode);
3797
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3798

3799
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order);
3800
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3801

3802
  int32_t rowSize = pInfo->pResBlock->info.rowSize;
L
Liu Jicong 已提交
3803
  pInfo->bufPageSize = getProperSortPageSize(rowSize);
3804

L
Liu Jicong 已提交
3805
  pOperator->name = "TableMergeScanOperator";
3806
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
L
Liu Jicong 已提交
3807 3808 3809
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
3810
  pOperator->exprSupp.numOfExprs = numOfCols;
L
Liu Jicong 已提交
3811
  pOperator->pTaskInfo = pTaskInfo;
3812
  initResultSizeInfo(&pOperator->resultInfo, 1024);
3813 3814

  pOperator->fpSet =
3815 3816
      createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
                          NULL, getTableMergeScanExplainExecInfo);
3817 3818 3819 3820 3821 3822 3823 3824 3825
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}