scanoperator.c 88.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <vnode.h>
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
H
Haojun Liao 已提交
20 21
#include "os.h"
#include "querynodes.h"
22
#include "systable.h"
23
#include "tglobal.h"
H
Haojun Liao 已提交
24
#include "tname.h"
25
#include "ttime.h"
H
Haojun Liao 已提交
26 27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
35
#include "vnode.h"
H
Haojun Liao 已提交
36

wmmhello's avatar
wmmhello 已提交
37 38
#include "executorInt.h"

H
Haojun Liao 已提交
39
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
40
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
41

42
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
43 44
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
45

46 47 48
static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                                   SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
49

50
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
51 52 53 54 55 56 57 58 59 60 61 62
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

63
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

92 93 94 95 96 97 98 99 100
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
101
  // convert key to second
102 103 104 105 106 107 108
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
109
  time_t    t = (time_t)key;
110 111 112 113 114
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
115
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
116 117 118 119

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
120
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
121 122 123 124

  tw->ekey -= 1;
}

125
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
126 127 128 129 130 131 132
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

133
  if (order == TSDB_ORDER_ASC) {
134
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
135 136 137 138 139 140
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

141 142
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
143 144 145 146 147 148 149 150 151 152
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
153 154 155 156 157 158 159
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

160
    while (1) {
161 162 163 164 165 166 167 168 169 170
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
171 172 173 174 175
  }

  return false;
}

L
Liu Jicong 已提交
176 177
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
178
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
179 180
  STableScanInfo* pInfo = pOperator->info;

181
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
182 183

  pCost->totalBlocks += 1;
184
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
185

186
  *status = pInfo->dataBlockLoadFlag;
187 188
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
189 190 191 192
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
193
  taosMemoryFreeClear(pBlock->pBlockAgg);
194 195

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
196 197
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
198 199 200
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
201 202
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
203
    pCost->skipBlocks += 1;
204 205

    // clear all data in pBlock that are set when handing the previous block
206
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
207 208 209 210
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

211 212 213 214
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
215
    bool             allColumnsHaveAgg = true;
216
    SColumnDataAgg** pColAgg = NULL;
217
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
218

219
    if (allColumnsHaveAgg == true) {
220
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
221 222

      // todo create this buffer during creating operator
223 224 225 226
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

227
      for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
228 229 230 231 232 233
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
234

235
      return TSDB_CODE_SUCCESS;
236
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
237
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
238
    }
H
Haojun Liao 已提交
239
  }
240

241
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
242

H
Haojun Liao 已提交
243 244 245 246 247 248 249 250 251 252
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
253

H
Haojun Liao 已提交
254 255
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
256

H
Haojun Liao 已提交
257 258 259
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
260 261
  }

262
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
263 264

  // currently only the tbname pseudo column
265
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
266
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;
267
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
268 269
  }

270
  int64_t st = taosGetTimestampMs();
271
  doFilter(pTableScanInfo->pFilterNode, pBlock);
272

273 274 275
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

276 277
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
278 279
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
280 281
  }

H
Haojun Liao 已提交
282 283 284
  return TSDB_CODE_SUCCESS;
}

285
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
286 287 288
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
289
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
290

291
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
292 293 294 295
  for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
    STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
    TSWAP(pTWindow->skey, pTWindow->ekey);
  }
296 297 298

  SQueryTableDataCond* pCond = &pTableScanInfo->cond;
  taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
H
Haojun Liao 已提交
299 300
}

301 302
void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                            SSDataBlock* pBlock) {
303
  // currently only the tbname pseudo column
304
  if (numOfPseudoExpr == 0) {
305 306 307 308
    return;
  }

  SMetaReader mr = {0};
309
  metaReaderInit(&mr, pHandle->meta, 0);
310 311
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

312 313
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
314 315 316 317

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
318

319
    colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows);
D
dapan1121 已提交
320
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
321 322 323 324 325

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
326
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
327
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
328 329
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
330
      const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
331

332 333 334 335
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
336
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
337
      }
338

339
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
L
Liu Jicong 已提交
340 341
        colDataAppend(pColInfoData, i, data,
                      (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
342
      }
343

344 345
      if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
          IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
wmmhello's avatar
wmmhello 已提交
346
        taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
347
      }
348 349 350 351 352 353
    }
  }

  metaReaderClear(&mr);
}

354 355 356 357
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

358 359
  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
  colInfoDataEnsureCapacity(&infoData, 1);
360

361
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
362
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
363 364 365 366 367

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

368
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
369
  STableScanInfo* pTableScanInfo = pOperator->info;
370
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
371
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
372

373 374
  int64_t st = taosGetTimestampUs();

375
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
376 377
    if (isTaskKilled(pTaskInfo)) {
      longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
378
    }
H
Haojun Liao 已提交
379

380 381 382 383 384 385
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

386 387 388 389 390 391 392 393
    blockDataCleanup(pBlock);

    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.rows);
    pBlock->info = binfo;
H
Haojun Liao 已提交
394

395 396 397 398 399 400
    uint32_t status = 0;
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }
401

402 403 404
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
405
    }
406

407
    uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
wmmhello's avatar
wmmhello 已提交
408 409 410 411
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

412 413 414 415 416
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
    return pBlock;
H
Haojun Liao 已提交
417 418 419 420
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
421
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
422 423 424 425
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
426
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
427 428 429
    return NULL;
  }

430 431
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
432 433 434 435 436 437 438 439 440
    while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
      }
      pTableScanInfo->curTWinIdx += 1;
      if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
      }
H
Haojun Liao 已提交
441 442
    }

443
    pTableScanInfo->scanTimes += 1;
444

445
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
446 447
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
448 449 450
      qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
      for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
        STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
451
        qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
452
      }
453 454 455
      // do prepare for the next round table scan operation
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
H
Haojun Liao 已提交
456
    }
457
  }
H
Haojun Liao 已提交
458

459
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
460
  if (pTableScanInfo->scanTimes < total) {
461 462
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
463 464
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
465
    }
H
Haojun Liao 已提交
466

467 468 469
    qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
    for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
      STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
470
      qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
471
    }
472

473
    while (pTableScanInfo->scanTimes < total) {
474 475 476 477 478 479 480 481 482
      while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        SSDataBlock* p = doTableScanImpl(pOperator);
        if (p != NULL) {
          return p;
        }
        pTableScanInfo->curTWinIdx += 1;
        if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
          tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
        }
483
      }
H
Haojun Liao 已提交
484

485
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
486

487
      if (pTableScanInfo->scanTimes < total) {
488 489
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
490

491 492
        qDebug("%s start to repeat descending order scan data blocks due to query func required",
               GET_TASKID(pTaskInfo));
493 494
        for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
          STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
495
          qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
496
        }
497
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
498
        pTableScanInfo->curTWinIdx = 0;
499
      }
H
Haojun Liao 已提交
500 501 502
    }
  }

wmmhello's avatar
wmmhello 已提交
503 504 505 506 507 508 509
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

510
  if (pInfo->currentGroupId == -1) {
wmmhello's avatar
wmmhello 已提交
511
    pInfo->currentGroupId++;
wmmhello's avatar
wmmhello 已提交
512
    if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
513
      setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
514 515
      return NULL;
    }
516
    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
wmmhello's avatar
wmmhello 已提交
517
    tsdbCleanupReadHandle(pInfo->dataReader);
518 519
    tsdbReaderT* pReader =
        tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
wmmhello's avatar
wmmhello 已提交
520 521 522 523
    pInfo->dataReader = pReader;
  }

  SSDataBlock* result = doTableScanGroup(pOperator);
524
  if (result) {
wmmhello's avatar
wmmhello 已提交
525 526 527 528 529
    return result;
  }

  pInfo->currentGroupId++;
  if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
530
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
531 532 533
    return NULL;
  }

534
  SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
wmmhello's avatar
wmmhello 已提交
535 536 537 538 539 540 541
  tsdbSetTableList(pInfo->dataReader, tableList);

  tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
  pInfo->curTWinIdx = 0;
  pInfo->scanTimes = 0;

  result = doTableScanGroup(pOperator);
542
  if (result) {
wmmhello's avatar
wmmhello 已提交
543 544 545
    return result;
  }

546 547
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
548 549
}

550 551
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
552
  STableScanInfo*         pTableScanInfo = pOptr->info;
553 554 555 556 557 558
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

559 560
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
561
  blockDataDestroy(pTableScanInfo->pResBlock);
562
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
H
Haojun Liao 已提交
563

564 565 566 567 568 569 570
  tsdbCleanupReadHandle(pTableScanInfo->dataReader);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

wmmhello's avatar
wmmhello 已提交
571 572
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId) {
H
Haojun Liao 已提交
573 574 575
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
576
    goto _error;
H
Haojun Liao 已提交
577 578
  }

L
Liu Jicong 已提交
579
  // taosSsleep(20);
D
dapan1121 已提交
580

581
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
582
  int32_t             numOfCols = 0;
583
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
584

585 586
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
587
    goto _error;
588 589 590
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
591
    SExprSupp* pSup = &pInfo->pseudoSup;
592 593
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
594 595
  }

596
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
597
  //    pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
598

599 600 601 602
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
603

604
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
605 606 607 608 609
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
  pInfo->curTWinIdx = 0;
wmmhello's avatar
wmmhello 已提交
610 611 612
  pInfo->queryId = queryId;
  pInfo->taskId = taskId;
  pInfo->currentGroupId = -1;
613 614

  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
615
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
616 617 618
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
619
  pOperator->exprSupp.numOfExprs = numOfCols;
620
  pOperator->pTaskInfo = pTaskInfo;
621

622 623
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
624 625 626

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
627
  return pOperator;
628

629
_error:
630 631 632 633 634
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);

  pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
635 636
}

637
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
638
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
639
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
640

L
Liu Jicong 已提交
641 642
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
643

644
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
645
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
646 647 648 649
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
650

651
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
652 653 654
  return pOperator;
}

655 656
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
  int32_t rowLen = 0;
H
Haojun Liao 已提交
657

658
  SMetaReader mr = {0};
659 660
  metaReaderInit(&mr, pMeta, 0);
  metaGetTableEntryByUid(&mr, uid);
661 662
  if (mr.me.type == TSDB_SUPER_TABLE) {
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
663
    for (int32_t i = 0; i < numOfCols; ++i) {
664 665 666 667 668 669 670
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    uint64_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;

671
    for (int32_t i = 0; i < numOfCols; ++i) {
672 673 674 675
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
676
    for (int32_t i = 0; i < numOfCols; ++i) {
677 678 679 680 681
      rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
    }
  }

  metaReaderClear(&mr);
682 683 684 685 686 687 688 689 690 691 692 693
  return rowLen;
}

static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SBlockDistInfo* pBlockScanInfo = pOperator->info;

  STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
  blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid);
694 695 696

  tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
  blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
H
Haojun Liao 已提交
697

698
  SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
H
Haojun Liao 已提交
699

700
  int32_t          slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
701
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
702

703
  int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
704
  char*   p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
705 706 707
  tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
  varDataSetLen(p, len);

708
  blockDataEnsureCapacity(pBlock, 1);
709 710
  colDataAppend(pColInfo, 0, p, false);
  taosMemoryFree(p);
H
Haojun Liao 已提交
711

712 713
  pBlock->info.rows = 1;

H
Haojun Liao 已提交
714 715 716 717
  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

718
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
719
  SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
720 721 722
  blockDataDestroy(pDistInfo->pResBlock);
}

723 724
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
725
  SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
726
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
727 728 729 730 731
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

732
  pInfo->pHandle = dataReader;
733
  pInfo->readHandle = *readHandle;
734 735
  pInfo->uid = uid;
  pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
736

737
  int32_t    numOfCols = 0;
738
  SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
739
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
740 741 742
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
743

744
  pOperator->name = "DataBlockDistScanOperator";
745
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
746 747 748
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
749 750 751 752
  pOperator->pTaskInfo = pTaskInfo;

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
                                         destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
753 754
  return pOperator;

755
_error:
H
Haojun Liao 已提交
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

H
Haojun Liao 已提交
772
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
773
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
774 775
}

H
Haojun Liao 已提交
776
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
777
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
778
}
5
54liuyao 已提交
779

5
54liuyao 已提交
780
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
781 782 783
  STimeWindow win = {
      .skey = INT64_MIN,
      .ekey = INT64_MAX,
784
  };
5
54liuyao 已提交
785
  bool needRead = false;
5
54liuyao 已提交
786 787
  if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
788 789
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo   dumyInfo;
5
54liuyao 已提交
790
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
791 792
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
793 794 795
      int64_t              gap = pInfo->sessionSup.gap;
      int32_t              winIndex = 0;
      SResultWindowInfo*   pCurWin =
5
54liuyao 已提交
796
          getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
5
54liuyao 已提交
797
      win = pCurWin->win;
798
      (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
5
54liuyao 已提交
799
    } else {
800 801 802 803
      win =
          getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
      (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
                                               TSDB_ORDER_ASC);
5
54liuyao 已提交
804
    }
5
54liuyao 已提交
805 806 807 808 809
    needRead = true;
  } else if (isStateWindow(pInfo)) {
    SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
    int32_t size = taosArrayGetSize(pWins);
    if (pInfo->scanWinIndex < size) {
810
      win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
5
54liuyao 已提交
811 812 813 814 815 816 817 818
      pInfo->scanWinIndex++;
      needRead = true;
    } else {
      pInfo->scanWinIndex = 0;
      taosArrayClear(pWins);
    }
  }
  if (!needRead) {
5
54liuyao 已提交
819 820
    return false;
  }
L
Liu Jicong 已提交
821
  STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
5
54liuyao 已提交
822 823
  pTableScanInfo->cond.twindows[0] = win;
  pTableScanInfo->curTWinIdx = 0;
824
  //  tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
5
54liuyao 已提交
825 826 827
  // if (!pTableScanInfo->dataReader) {
  //   return false;
  // }
5
54liuyao 已提交
828
  pTableScanInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
829
  pTableScanInfo->currentGroupId = -1;
5
54liuyao 已提交
830
  return true;
5
54liuyao 已提交
831 832
}

833
static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) {
834
  for (int32_t j = 0; j < taosArrayGetSize(source->pDataBlock); j++) {
835 836 837 838 839 840
    SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j);
    SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j);
    if (colDataIsNull_s(pSourceCol, sourceRowId)) {
      colDataAppendNULL(pDestCol, dest->info.rows);
    } else {
      colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false);
5
54liuyao 已提交
841 842
    }
  }
843
  dest->info.rows++;
5
54liuyao 已提交
844 845
}

846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) {
  uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
  if (groupId) {
    return *groupId;
  }
  return 0;
  /* Todo(liuyao) for partition by column
  recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
  int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
  uint64_t resId = 0;
  uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
  if (groupId) {
    return *groupId;
  } else if (len != 0) {
    resId = calcGroupId(pTableScanInfo->keyBuf, len);
    taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
  }
  return resId;
  */
}

5
54liuyao 已提交
867
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
868 869
  while (1) {
    SSDataBlock* pResult = NULL;
L
Liu Jicong 已提交
870
    pResult = doTableScan(pInfo->pSnapshotReadOp);
871
    if (pResult == NULL) {
5
54liuyao 已提交
872
      if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
873
        // scan next window data
L
Liu Jicong 已提交
874
        pResult = doTableScan(pInfo->pSnapshotReadOp);
875 876 877 878 879
      }
    }
    if (!pResult) {
      return NULL;
    }
L
Liu Jicong 已提交
880

881 882
    if (pResult->info.groupId == pInfo->groupId) {
      return pResult;
5
54liuyao 已提交
883 884
    }
  }
885

L
Liu Jicong 已提交
886 887 888 889 890 891 892 893
  /* Todo(liuyao) for partition by column
    SSDataBlock* pBlock = createOneDataBlock(pResult, true);
    blockDataCleanup(pResult);
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
      uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
      if (id == pInfo->groupId) {
        copyOneRow(pResult, pBlock, i);
      }
894
    }
L
Liu Jicong 已提交
895 896
    return pResult;
  */
897 898 899 900
}

static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
  blockDataCleanup(pUpdateBlock);
5
54liuyao 已提交
901
  int32_t size = taosArrayGetSize(pInfo->tsArray);
902
  if (pInfo->tsArrayIndex < size) {
5
54liuyao 已提交
903
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
904
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
905
    blockDataEnsureCapacity(pUpdateBlock, size);
906 907

    int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
L
Liu Jicong 已提交
908
    pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
909
    int32_t i = 0;
L
Liu Jicong 已提交
910
    for (; i < size; i++) {
911
      rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
L
Liu Jicong 已提交
912
      uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
913 914
      if (pInfo->groupId != id) {
        break;
915
      }
916
      copyOneRow(pUpdateBlock, pBlock, rowId);
917
    }
918 919 920
    pUpdateBlock->info.rows = i;
    pInfo->tsArrayIndex += i;
    pUpdateBlock->info.groupId = pInfo->groupId;
5
54liuyao 已提交
921
    pUpdateBlock->info.type = STREAM_CLEAR;
5
54liuyao 已提交
922
    blockDataUpdateTsWindow(pUpdateBlock, 0);
923 924 925 926
  }
  // all rows have same group id
  ASSERT(pInfo->tsArrayIndex >= size);
  if (size > 0 && pInfo->tsArrayIndex == size) {
5
54liuyao 已提交
927
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
928 929 930
  }
}

931 932 933 934 935 936 937 938 939 940 941
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
                               SSDataBlock* pUpdateBlock) {
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY* ts = (TSKEY*)pColDataInfo->pData;
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) {
      taosArrayPush(pInfo->tsArray, &rowId);
    }
  }
  if (!pUpdateBlock) {
5
54liuyao 已提交
942
    taosArrayClear(pInfo->tsArray);
943
    return;
5
54liuyao 已提交
944
  }
945 946 947 948 949 950
  setUpdateData(pInfo, pBlock, pUpdateBlock);
  // Todo(liuyao) get from tsdb
  //  SSDataBlock* p = createOneDataBlock(pBlock, true);
  //  p->info.type = STREAM_INVERT;
  //  taosArrayClear(pInfo->tsArray);
  //  return p;
5
54liuyao 已提交
951 952
}

953
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
954 955 956 957
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;

958
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
959
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
960 961 962
    return NULL;
  }

5
54liuyao 已提交
963
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
964
  // TODO: refactor
L
Liu Jicong 已提交
965
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
966
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
967
      /*doClearBufferedBlocks(pInfo);*/
968
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
969 970 971
      return NULL;
    }

972
    int32_t      current = pInfo->validBlockIndex++;
973 974
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
    blockDataUpdateTsWindow(pBlock, 0);
5
54liuyao 已提交
975
    if (pBlock->info.type == STREAM_RETRIEVE) {
L
Liu Jicong 已提交
976
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
5
54liuyao 已提交
977 978 979 980 981 982
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
      copyDataBlock(pInfo->pPullDataRes, pBlock);
      pInfo->pullDataResIndex = 0;
      prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
      updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
    }
983
    return pBlock;
L
Liu Jicong 已提交
984
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
5
54liuyao 已提交
985 986 987 988 989 990
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
991
      if (!isStateWindow(pInfo)) {
5
54liuyao 已提交
992
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
993
      }
5
54liuyao 已提交
994
      return pInfo->pUpdateRes;
5
54liuyao 已提交
995 996 997 998 999 1000 1001 1002
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
      SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
      if (pSDB != NULL) {
        getUpdateDataBlock(pInfo, true, pSDB, NULL);
        pSDB->info.type = STREAM_PUSH_DATA;
        return pSDB;
      }
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
1003
    } else {
1004
      if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
5
54liuyao 已提交
1005 1006
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
        pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
5
54liuyao 已提交
1007
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
1008 1009
      }
      if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
5
54liuyao 已提交
1010
        SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
1011
        if (pSDB == NULL) {
1012 1013 1014
          setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
          if (pInfo->pUpdateRes->info.rows > 0) {
            if (!isStateWindow(pInfo)) {
5
54liuyao 已提交
1015 1016 1017
              // Todo(liuyao) mybe can delete this.
              bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
              ASSERT(test == false);
1018 1019 1020 1021 1022
            }
            return pInfo->pUpdateRes;
          } else {
            pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
          }
5
54liuyao 已提交
1023
        } else {
5
54liuyao 已提交
1024
          pSDB->info.type = STREAM_NORMAL;
5
54liuyao 已提交
1025 1026 1027
          getUpdateDataBlock(pInfo, true, pSDB, NULL);
          return pSDB;
        }
5
54liuyao 已提交
1028
      }
5
54liuyao 已提交
1029
    }
5
54liuyao 已提交
1030

H
Haojun Liao 已提交
1031 1032 1033
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

1034
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
1035
      SSDataBlock block = {0};
1036

1037
      // todo refactor
1038 1039 1040 1041 1042
      int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);

      uint64_t groupId = block.info.groupId;
      uint64_t uid = block.info.uid;
      int32_t  numOfRows = block.info.rows;
H
Haojun Liao 已提交
1043

1044 1045 1046
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
1047 1048
      }

1049 1050
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
1051
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
1052
      pInfo->pRes->info.type = STREAM_NORMAL;
1053
      pInfo->pRes->info.capacity = numOfRows;
H
Haojun Liao 已提交
1054

1055 1056 1057 1058
      // for generating rollup SMA result, each time is an independent time serie.
      // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
      if (pInfo->assignBlockUid) {
        pInfo->pRes->info.groupId = uid;
1059 1060
      } else {
        pInfo->pRes->info.groupId = groupId;
1061 1062
      }

1063 1064 1065 1066 1067
      uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
      if (groupIdPre) {
        pInfo->pRes->info.groupId = *groupIdPre;
      }

1068
      // todo extract method
1069
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
1070
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
1071 1072 1073 1074
        if (!pColMatchInfo->output) {
          continue;
        }

1075
        bool colExists = false;
1076 1077
        for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
          SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
1090 1091
      }

H
Haojun Liao 已提交
1092
      taosArrayDestroy(block.pDataBlock);
H
Haojun Liao 已提交
1093 1094
      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
5
54liuyao 已提交
1095
        updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1096
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
1097 1098 1099
        pTaskInfo->code = terrno;
        return NULL;
      }
5
54liuyao 已提交
1100

1101 1102 1103 1104 1105
      // currently only the tbname pseudo column
      if (pInfo->numOfPseudoExpr > 0) {
        addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
      }

1106
      doFilter(pInfo->pCondition, pInfo->pRes);
1107
      blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1108 1109 1110
      if (pBlockInfo->rows > 0) {
        break;
      }
H
Haojun Liao 已提交
1111 1112 1113 1114
    }

    // record the scan action.
    pInfo->numOfExec++;
1115
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
1116

5
54liuyao 已提交
1117
    if (pBlockInfo->rows == 0) {
5
54liuyao 已提交
1118
      updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1119
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
1120
    } else if (pInfo->pUpdateInfo) {
1121
      pInfo->tsArrayIndex = 0;
5
54liuyao 已提交
1122 1123
      getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
      if (pInfo->pUpdateRes->info.rows > 0) {
5
54liuyao 已提交
1124
        if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
1125 1126
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
5
54liuyao 已提交
1127
        } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
1128
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
5
54liuyao 已提交
1129
          return pInfo->pUpdateRes;
5
54liuyao 已提交
1130
        }
5
54liuyao 已提交
1131
      }
1132 1133
    }

5
54liuyao 已提交
1134
    return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
L
Liu Jicong 已提交
1135

L
Liu Jicong 已提交
1136 1137 1138
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
    // check reader last status
    // if not match, reset status
L
Liu Jicong 已提交
1139
    SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
L
Liu Jicong 已提交
1140 1141
    return pResult && pResult->info.rows > 0 ? pResult : NULL;

L
Liu Jicong 已提交
1142 1143 1144
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1145 1146 1147
  }
}

1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
  for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

1160 1161 1162
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
                                            SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
                                            uint64_t taskId) {
H
Haojun Liao 已提交
1163 1164
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1165

H
Haojun Liao 已提交
1166 1167
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
1168
    goto _error;
H
Haojun Liao 已提交
1169 1170
  }

1171 1172 1173
  SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;

  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
1174

1175
  int32_t numOfCols = 0;
1176
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
1177 1178 1179

  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
1180
  for (int32_t i = 0; i < numOfOutput; ++i) {
1181 1182 1183
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
1184
    taosArrayPush(pColIds, &colId);
5
54liuyao 已提交
1185 1186 1187
    if (id->colId == pTableScanNode->tsColId) {
      pInfo->primaryTsIndex = id->targetSlotId;
    }
H
Haojun Liao 已提交
1188 1189 1190 1191
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
1192 1193
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1194 1195
  }

1196
  pInfo->tsArray = taosArrayInit(4, sizeof(int32_t));
5
54liuyao 已提交
1197
  if (pInfo->tsArray == NULL) {
1198
    goto _error;
5
54liuyao 已提交
1199 1200
  }

L
Liu Jicong 已提交
1201
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
1202
    SOperatorInfo*  pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
L
Liu Jicong 已提交
1203 1204 1205 1206 1207 1208
    STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
    if (pSTInfo->interval.interval > 0) {
      pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
    } else {
      pInfo->pUpdateInfo = NULL;
    }
L
Liu Jicong 已提交
1209
    pInfo->pSnapshotReadOp = pTableScanDummy;
L
Liu Jicong 已提交
1210 1211
    pInfo->interval = pSTInfo->interval;

L
Liu Jicong 已提交
1212 1213 1214 1215 1216
    pInfo->readHandle = *pHandle;
    ASSERT(pHandle->reader);
    pInfo->streamBlockReader = pHandle->reader;
    pInfo->tableUid = pScanPhyNode->uid;

L
Liu Jicong 已提交
1217
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
1218
    tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
L
Liu Jicong 已提交
1219 1220 1221 1222 1223 1224 1225
    SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
    int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
5
54liuyao 已提交
1226 1227
  }

1228 1229 1230 1231 1232
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

1233
  pInfo->pRes = createResDataBlock(pDescNode);
X
Xiaoyu Wang 已提交
1234
  pInfo->pUpdateRes = createResDataBlock(pDescNode);
1235 1236 1237
  pInfo->pCondition = pScanPhyNode->node.pConditions;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
1238
  pInfo->groupId = 0;
5
54liuyao 已提交
1239
  pInfo->pPullDataRes = createPullDataBlock();
L
Liu Jicong 已提交
1240

1241
  pOperator->name = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1242
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1243 1244 1245
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1246
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
1247
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1248

1249 1250
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1251

H
Haojun Liao 已提交
1252
  return pOperator;
1253

L
Liu Jicong 已提交
1254
_error:
1255 1256 1257
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1258 1259 1260 1261 1262 1263 1264
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1265
  const char* name = tNameGetTableName(&pInfo->name);
1266
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1267
    metaCloseTbCursor(pInfo->pCur);
1268
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1269
  }
H
Haojun Liao 已提交
1270 1271

  taosArrayDestroy(pInfo->scanCols);
H
Haojun Liao 已提交
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
}

EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
      if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1312
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1313 1314 1315 1316 1317 1318 1319
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1320
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1321 1322 1323
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1324
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1325 1326
}

1327
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1328 1329 1330 1331 1332 1333 1334
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1335 1336 1337
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1338 1339 1340 1341 1342
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1343
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1344 1345 1346 1347 1348 1349 1350
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

1351 1352
  doFilter(pInfo->pCondition, pInfo->pRes);
#if 0
H
Haojun Liao 已提交
1353
  SFilterInfo* filter = NULL;
1354 1355

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1356 1357 1358 1359 1360

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1361
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1362
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1363

1364
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1365 1366 1367 1368 1369 1370 1371 1372
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1373
    if (keep) {
1374
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info);
D
dapan1121 已提交
1375 1376 1377 1378 1379 1380 1381
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1382

1383 1384 1385 1386 1387 1388
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1389
        numOfRow += 1;
H
Haojun Liao 已提交
1390
      }
D
dapan1121 已提交
1391 1392
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1393 1394 1395 1396 1397
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;
1398
#endif
H
Haojun Liao 已提交
1399 1400 1401 1402

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1403
static SSDataBlock* buildSysTableMetaBlock() {
L
Liu Jicong 已提交
1404 1405
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1406 1407 1408
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1409 1410
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1411 1412 1413 1414
      index = i;
      break;
    }
  }
1415

1416
  SSDataBlock* pBlock = createDataBlock();
L
Liu Jicong 已提交
1417
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
L
Liu Jicong 已提交
1418 1419
    SColumnInfoData colInfoData =
        createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
1420
    blockDataAppendColInfo(pBlock, &colInfoData);
1421 1422
  }

1423 1424 1425
  return pBlock;
}

1426
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1427 1428 1429 1430 1431
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1432 1433
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1434 1435 1436 1437
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1438 1439
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
1440
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1441

1442 1443
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1444

1445
      doSetOperatorCompleted(pOperator);
1446 1447 1448 1449 1450
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1451

1452 1453
      blockDataCleanup(pInfo->pRes);
      int32_t numOfRows = 0;
1454

1455 1456 1457
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1458

1459 1460 1461
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1462

1463 1464
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1465

1466
      SSDataBlock* p = buildSysTableMetaBlock();
1467
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1468

1469
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1470 1471 1472

      int32_t ret = 0;
      while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
1473
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1474

1475 1476 1477
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1478

1479 1480 1481
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1482

1483 1484 1485
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1486

1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1500
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
1501 1502

          // super table name
wmmhello's avatar
wmmhello 已提交
1503
          STR_TO_VARSTR(n, mr.me.name);
1504
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
wmmhello's avatar
wmmhello 已提交
1505
          colDataAppend(pColInfoData, numOfRows, n, false);
1506 1507
          metaReaderClear(&mr);

wmmhello's avatar
wmmhello 已提交
1508 1509
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1510
          if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1511 1512 1513
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1514
          } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1515 1516 1517
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1518
          } else {
wmmhello's avatar
wmmhello 已提交
1519 1520 1521
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1522 1523 1524 1525 1526 1527 1528 1529
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1530
          STR_TO_VARSTR(n, "CHILD_TABLE");
1531 1532 1533 1534 1535 1536 1537
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1538
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
1539 1540 1541 1542 1543

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

wmmhello's avatar
wmmhello 已提交
1544 1545
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1546
          if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1547 1548 1549
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1550
          } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1551 1552 1553
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1554
          } else {
wmmhello's avatar
wmmhello 已提交
1555 1556 1557
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1558 1559 1560 1561 1562 1563 1564 1565
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1566
          STR_TO_VARSTR(n, "NORMAL_TABLE");
1567
        }
1568

1569
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
wmmhello's avatar
wmmhello 已提交
1570
        colDataAppend(pColInfoData, numOfRows, n, false);
1571

1572
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1573 1574
          break;
        }
H
Haojun Liao 已提交
1575 1576
      }

1577 1578 1579 1580 1581 1582 1583
      // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
      if (ret != 0) {
        metaCloseTbCursor(pInfo->pCur);
        pInfo->pCur = NULL;
        doSetOperatorCompleted(pOperator);
      }

1584 1585
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1586

1587
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1588
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1589

1590 1591
      blockDataDestroy(p);

1592 1593 1594
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1595 1596 1597 1598 1599
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1600 1601 1602
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
1603
      strcpy(pInfo->req.user, pInfo->pUser);
H
Haojun Liao 已提交
1604

1605 1606 1607 1608 1609
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1610

1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1622

L
Liu Jicong 已提交
1623 1624
      int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
                                                                                : TDMT_MND_SYSTABLE_RETRIEVE;
D
dapan1121 已提交
1625

1626 1627 1628
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
D
dapan1121 已提交
1629
      pMsgSendInfo->msgType = msgType;
1630
      pMsgSendInfo->fp = loadSysTableCallback;
D
dapan1121 已提交
1631
      pMsgSendInfo->requestId = pTaskInfo->id.queryId;
H
Haojun Liao 已提交
1632

1633
      int64_t transporterId = 0;
1634 1635
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1636
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1637

1638 1639 1640 1641 1642
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1643

1644 1645
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1646

1647 1648
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
1649
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
1650
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1651

1652
        if (pRsp->numOfRows == 0) {
H
Haojun Liao 已提交
1653
          taosMemoryFree(pRsp);
1654 1655 1656
          return NULL;
        }
      }
H
Haojun Liao 已提交
1657

1658
      extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
L
Liu Jicong 已提交
1659
                                   pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1660

1661 1662
      // todo log the filter info
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1663
      taosMemoryFree(pRsp);
1664 1665
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
D
dapan1121 已提交
1666 1667
      } else if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
1668
      }
1669
    }
H
Haojun Liao 已提交
1670 1671 1672
  }
}

1673
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1674
  SSDataBlock* p = buildSysTableMetaBlock();
1675
  blockDataEnsureCapacity(p, capacity);
1676

L
Liu Jicong 已提交
1677
  size_t               size = 0;
1678 1679 1680 1681 1682 1683 1684 1685
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

1686
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1687
  pInfo->pRes->info.rows = p->info.rows;
1688 1689 1690
  blockDataDestroy(p);

  return pInfo->pRes->info.rows;
1691 1692
}

L
Liu Jicong 已提交
1693 1694 1695
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1696 1697
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1698
  for (int32_t i = 0; i < size; ++i) {
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1719
    for (int32_t j = 4; j <= 8; ++j) {
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1735
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
1736
                                              const char* pUser, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1737 1738 1739
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
1740
    goto _error;
H
Haojun Liao 已提交
1741 1742
  }

1743 1744 1745
  SScanPhysiNode* pScanNode = &pScanPhyNode->scan;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
1746
  SSDataBlock*        pResBlock = createResDataBlock(pDescNode);
1747 1748

  int32_t num = 0;
1749
  SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
1750

1751 1752
  pInfo->accountId = pScanPhyNode->accountId;
  pInfo->pUser = taosMemoryStrDup((void*)pUser);
1753
  pInfo->showRewrite = pScanPhyNode->showRewrite;
1754 1755 1756
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pScanNode->node.pConditions;
  pInfo->scanCols = colList;
1757 1758

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1759

1760
  tNameAssign(&pInfo->name, &pScanNode->tableName);
1761
  const char* name = tNameGetTableName(&pInfo->name);
1762

1763
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1764
    pInfo->readHandle = *(SReadHandle*)readHandle;
1765
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1766 1767
  } else {
    tsem_init(&pInfo->ready, 0, 0);
1768
    pInfo->epSet = pScanPhyNode->mgmtEpSet;
1769
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1770 1771
  }

1772
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1773
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
1774 1775 1776
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1777
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
1778
  pOperator->pTaskInfo = pTaskInfo;
1779

L
Liu Jicong 已提交
1780 1781
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
H
Haojun Liao 已提交
1782 1783

  return pOperator;
1784

1785
_error:
1786 1787 1788 1789
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
1790
}
H
Haojun Liao 已提交
1791

1792
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1793 1794 1795 1796
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1797 1798 1799
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1800 1801 1802 1803 1804 1805 1806 1807
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

1808
  int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
H
Haojun Liao 已提交
1809 1810 1811
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

1812
    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
H
Haojun Liao 已提交
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1872
#endif
H
Haojun Liao 已提交
1873

1874
  STagScanInfo* pInfo = pOperator->info;
1875
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
1876
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1877

wmmhello's avatar
wmmhello 已提交
1878 1879
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
1880 1881 1882 1883
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1884 1885 1886
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1887
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1888

wmmhello's avatar
wmmhello 已提交
1889 1890
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
1891
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1892

1893
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
1894 1895 1896 1897 1898 1899
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
1900
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
1901 1902 1903 1904
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
        const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);

1905 1906 1907 1908
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
1909 1910
          data = (char*)p;
        }
L
Liu Jicong 已提交
1911 1912
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
1913

1914 1915
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
1916
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
1917
        }
H
Haojun Liao 已提交
1918 1919 1920
      }
    }

1921
    count += 1;
wmmhello's avatar
wmmhello 已提交
1922
    if (++pInfo->curPos >= size) {
1923
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1924 1925 1926
    }
  }

1927 1928
  metaReaderClear(&mr);

1929
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
1930
  if (pOperator->status == OP_EXEC_DONE) {
1931
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
1932 1933 1934
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
1935
  pOperator->resultInfo.totalRows += count;
1936

1937
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
1938 1939 1940 1941 1942 1943 1944
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

1945 1946
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
1947
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
1948 1949 1950 1951 1952
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1953 1954
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

1955
  int32_t    num = 0;
1956 1957
  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
L
Liu Jicong 已提交
1958
  SArray*    colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
1959

1960 1961 1962 1963
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1964

1965 1966 1967 1968 1969
  pInfo->pTableList = pTableListInfo;
  pInfo->pColMatchInfo = colList;
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
1970

1971
  pOperator->name = "TagScanOperator";
1972
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
1973

1974 1975 1976 1977
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
1978

1979 1980 1981
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

1982 1983
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
1984 1985

  return pOperator;
1986

1987
_error:
H
Haojun Liao 已提交
1988 1989 1990 1991 1992
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
1993 1994

typedef struct STableMergeScanInfo {
1995
  STableListInfo* tableListInfo;
S
slzhou 已提交
1996 1997 1998 1999
  int32_t         tableStartIndex;
  int32_t         tableEndIndex;
  bool            hasGroupId;
  uint64_t        groupId;
2000

2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012
  SArray*     dataReaders;  // array of tsdbReaderT*
  SReadHandle readHandle;

  int32_t  bufPageSize;
  uint32_t sortBufSize;  // max buffer size for in-memory sort

  SArray*      pSortInfo;
  SSortHandle* pSortHandle;

  SSDataBlock* pSortInputBlock;
  int64_t      startTs;  // sort start time

2013 2014 2015
  SArray*  sortSourceParams;
  uint64_t queryId;
  uint64_t taskId;
2016 2017 2018 2019 2020 2021 2022 2023 2024

  SFileBlockLoadRecorder readRecorder;
  int64_t                numOfRows;
  //  int32_t         prevGroupId;  // previous table group id
  SScanInfo       scanInfo;
  int32_t         scanTimes;
  SNode*          pFilterNode;  // filter info, which is push down by optimizer
  SqlFunctionCtx* pCtx;         // which belongs to the direct upstream operator operator query context
  SResultRowInfo* pResultRowInfo;
2025
  int32_t*        rowEntryInfoOffset;
2026 2027 2028 2029 2030 2031 2032 2033
  SExprInfo*      pExpr;
  SSDataBlock*    pResBlock;
  SArray*         pColMatchInfo;
  int32_t         numOfOutput;

  SExprInfo*      pPseudoExpr;
  int32_t         numOfPseudoExpr;
  SqlFunctionCtx* pPseudoCtx;
2034
  //  int32_t*        rowEntryInfoOffset;
2035 2036 2037 2038 2039 2040 2041 2042 2043 2044

  SQueryTableDataCond cond;
  int32_t             scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t             dataBlockLoadFlag;
  SInterval interval;  // if the upstream is an interval operator, the interval info is also kept here to get the time
                       // window to check if current data block needs to be loaded.

  SSampleExecInfo sample;  // sample execution info
} STableMergeScanInfo;

2045
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
2046
                                STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
wmmhello's avatar
wmmhello 已提交
2047
  int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
2048
  if (code != TSDB_CODE_SUCCESS) {
2049
    return code;
2050 2051 2052 2053
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
2054 2055
    return TSDB_CODE_SUCCESS;
  }
2056
  code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
2057
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2058
    return code;
2059 2060 2061 2062 2063
  }

  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
                                  int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
                                  uint64_t taskId) {
  for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
    SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));

    tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, queryId, taskId);
    taosArrayPush(arrayReader, &pReader);

    taosArrayDestroy(subTableList);
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2080
// todo refactor
2081 2082
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
                                         int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
L
Liu Jicong 已提交
2083
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
S
shenglian zhou 已提交
2084
  STableMergeScanInfo* pInfo = pOperator->info;
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110

  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;

  pCost->totalBlocks += 1;
  pCost->totalRows += pBlock->info.rows;

  *status = pInfo->dataBlockLoadFlag;
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
  taosMemoryFreeClear(pBlock->pBlockAgg);

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

    // clear all data in pBlock that are set when handing the previous block
2111
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

    bool             allColumnsHaveAgg = true;
    SColumnDataAgg** pColAgg = NULL;
    tsdbReaderT*     reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
    tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg);

    if (allColumnsHaveAgg == true) {
2126
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168

      // todo create this buffer during creating operator
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }

      return TSDB_CODE_SUCCESS;
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
    }
  }

  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);

  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif

  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;

  tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
  SArray*      pCols = tsdbRetrieveDataBlock(reader, NULL);
  if (pCols == NULL) {
    return terrno;
  }

2169
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
                           pBlock);
  }

  int64_t st = taosGetTimestampMs();
  doFilter(pTableScanInfo->pFilterNode, pBlock);

  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
  }

  return TSDB_CODE_SUCCESS;
}

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
2195
  SSDataBlock*   inputBlock;
2196 2197 2198 2199 2200 2201
} STableMergeScanSortSourceParam;

static SSDataBlock* getTableDataBlock(void* param) {
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  int32_t                         readerIdx = source->readerIdx;
2202
  SSDataBlock*                    pBlock = source->inputBlock;
2203 2204 2205 2206
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  int64_t st = taosGetTimestampUs();

2207 2208
  blockDataCleanup(pBlock);

2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220
  tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
  while (tsdbNextDataBlock(reader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

2221 2222 2223 2224 2225 2226 2227
    blockDataCleanup(pBlock);
    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(reader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.capacity);
    pBlock->info = binfo;
2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265

    uint32_t status = 0;
    int32_t  code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

    uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    return pBlock;
  }
  return NULL;
}

SArray* generateSortByTsInfo(int32_t order) {
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
  bi.slotId = 0;
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

2266
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2267 2268 2269
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
  {
    size_t  tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
    int32_t i = pInfo->tableStartIndex + 1;
    for (; i < tableListSize; ++i) {
      STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2281

S
slzhou 已提交
2282 2283
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2284

S
slzhou 已提交
2285 2286
  STableListInfo* tableListInfo = pInfo->tableListInfo;
  createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
2287
                            pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
2288

2289 2290
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2291
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2292
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2293 2294
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2295 2296 2297 2298

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);

  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
2299 2300 2301 2302 2303 2304 2305 2306
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
  }

2307 2308
  for (int32_t i = 0; i < numReaders; ++i) {
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2309
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
    ps->param = param;
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, terrno);
  }

2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  tsortDestroySortHandle(pInfo->pSortHandle);
  taosArrayClear(pInfo->sortSourceParams);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) {
    tsdbReaderT* reader = taosArrayGetP(pInfo->dataReaders, i);
    tsdbCleanupReadHandle(reader);
  }
  taosArrayDestroy(pInfo->dataReaders);
2335 2336 2337 2338

  return TSDB_CODE_SUCCESS;
}

2339
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) {
2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2351
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2352 2353 2354 2355
    if (pTupleHandle == NULL) {
      break;
    }

2356
    appendOneRowToDataBlock(p, pTupleHandle);
2357 2358 2359 2360 2361
    if (p->info.rows >= capacity) {
      break;
    }
  }

2362 2363
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
  return (p->info.rows > 0) ? p : NULL;
2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
S
slzhou 已提交
2378 2379 2380
  size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2381

S
slzhou 已提交
2382
    if (tableListSize == 0) {
2383 2384 2385
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
S
slzhou 已提交
2386 2387
    pInfo->tableStartIndex = 0;
    pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
2388 2389
    startGroupTableMergeScan(pOperator);
  }
S
slzhou 已提交
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
        doSetOperatorCompleted(pOperator);
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
      pInfo->groupId =
          ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2408 2409
  }

2410 2411 2412 2413 2414
  return pBlock;
}

void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
2415
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
}

2427 2428
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
L
Liu Jicong 已提交
2429
  SSortExecInfo          sortExecInfo;
2430 2431
} STableMergeScanExecInfo;

2432 2433
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2434 2435
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2436
  STableMergeScanInfo*     pInfo = pOptr->info;
2437 2438 2439 2440 2441
  execInfo->blockRecorder = pInfo->readRecorder;
  execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2442

2443 2444 2445
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2446 2447 2448 2449 2450 2451
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
  const STableKeyInfo* info1 = p1;
  const STableKeyInfo* info2 = p2;
  return info1->groupId - info2->groupId;
}

2452 2453 2454
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
                                                uint64_t taskId) {
2455 2456 2457 2458 2459
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2460
  if (pTableScanNode->pGroupTags) {
S
slzhou 已提交
2461 2462
    taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
  }
2463 2464 2465 2466

  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
L
Liu Jicong 已提交
2467
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
2468 2469 2470 2471 2472 2473 2474 2475

  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
2476
    pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset);
2477 2478 2479 2480
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

L
Liu Jicong 已提交
2481 2482
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
2483
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2484 2485 2486
  pInfo->sample.seed = taosGetTimestampSec();
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
2487
  pInfo->tableListInfo = pTableListInfo;
L
Liu Jicong 已提交
2488 2489
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
2490 2491

  pInfo->pResBlock = createResDataBlock(pDescNode);
2492 2493 2494
  pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
  pInfo->queryId = queryId;
  pInfo->taskId = taskId;
2495

2496
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2497

2498 2499
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2500

2501
  int32_t rowSize = pInfo->pResBlock->info.rowSize;
L
Liu Jicong 已提交
2502
  pInfo->bufPageSize = getProperSortPageSize(rowSize);
2503

L
Liu Jicong 已提交
2504
  pOperator->name = "TableMergeScanOperator";
2505
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
L
Liu Jicong 已提交
2506 2507 2508
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2509
  pOperator->exprSupp.numOfExprs = numOfCols;
L
Liu Jicong 已提交
2510
  pOperator->pTaskInfo = pTaskInfo;
2511 2512 2513
  initResultSizeInfo(pOperator, 1024);

  pOperator->fpSet =
2514 2515
      createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
                          NULL, getTableMergeScanExplainExecInfo);
2516 2517 2518 2519 2520 2521 2522 2523 2524
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}