scanoperator.c 96.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
17
#include "function.h"
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19 20
#include "os.h"
#include "querynodes.h"
21
#include "systable.h"
H
Haojun Liao 已提交
22
#include "tname.h"
23
#include "ttime.h"
H
Haojun Liao 已提交
24 25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33
#include "vnode.h"
H
Haojun Liao 已提交
34 35

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
36
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
37

38
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
39 40
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
41

42 43 44
static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                                   SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
45

46
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
47 48 49 50 51 52 53 54 55 56 57 58
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

59
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

88 89 90 91 92 93 94 95 96
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
97
  // convert key to second
98 99 100 101 102 103 104
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
105
  time_t    t = (time_t)key;
106 107 108 109 110
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
111
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
112 113 114 115

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
116
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
117 118 119 120

  tw->ekey -= 1;
}

121
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
122 123 124 125 126 127 128
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

129
  if (order == TSDB_ORDER_ASC) {
130
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
131 132 133 134 135 136
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

137 138
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
139 140 141 142 143 144 145 146 147 148
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
149 150 151 152 153 154 155
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

156
    while (1) {
157 158 159 160 161 162 163 164 165 166
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
167 168 169 170 171
  }

  return false;
}

L
Liu Jicong 已提交
172 173
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
174
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
175 176
  STableScanInfo* pInfo = pOperator->info;

177
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
178 179

  pCost->totalBlocks += 1;
180
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
181

182
  *status = pInfo->dataBlockLoadFlag;
183 184
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
185 186 187 188
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
189
  taosMemoryFreeClear(pBlock->pBlockAgg);
190 191

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
192 193
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
194 195 196
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
197 198
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
199
    pCost->skipBlocks += 1;
200 201

    // clear all data in pBlock that are set when handing the previous block
202
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
203 204 205 206
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

207 208 209 210
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
211
    bool             allColumnsHaveAgg = true;
212
    SColumnDataAgg** pColAgg = NULL;
213
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
214

215
    if (allColumnsHaveAgg == true) {
216
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
217 218

      // todo create this buffer during creating operator
219 220 221 222
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

223
      for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
224 225 226 227 228 229
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
230

231
      return TSDB_CODE_SUCCESS;
232
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
233
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
234
    }
H
Haojun Liao 已提交
235
  }
236

237
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
238

H
Haojun Liao 已提交
239 240 241 242 243 244 245 246 247 248
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
249

H
Haojun Liao 已提交
250 251
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
252

H
Haojun Liao 已提交
253 254 255
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
256 257
  }

258
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
259 260

  // currently only the tbname pseudo column
261
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
262
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;
263
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
264 265
  }

266
  int64_t st = taosGetTimestampMs();
267
  doFilter(pTableScanInfo->pFilterNode, pBlock);
268

269 270 271
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

272 273
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
274 275
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
276 277
  }

H
Haojun Liao 已提交
278 279 280
  return TSDB_CODE_SUCCESS;
}

281
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
282 283 284
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
285
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
286

287
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
288 289 290 291
  for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
    STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
    TSWAP(pTWindow->skey, pTWindow->ekey);
  }
292 293 294

  SQueryTableDataCond* pCond = &pTableScanInfo->cond;
  taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
H
Haojun Liao 已提交
295 296
}

297 298
void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                            SSDataBlock* pBlock) {
299
  // currently only the tbname pseudo column
300
  if (numOfPseudoExpr == 0) {
301 302 303 304
    return;
  }

  SMetaReader mr = {0};
305
  metaReaderInit(&mr, pHandle->meta, 0);
306 307
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

308 309
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
310 311 312 313

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
314

315
    colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows);
D
dapan1121 已提交
316
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
317 318 319 320 321

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
322
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
323
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
324 325
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
326
      const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
327

328 329 330 331
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
332
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
333
      }
334

335
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
L
Liu Jicong 已提交
336 337
        colDataAppend(pColInfoData, i, data,
                      (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
338
      }
339

340 341
      if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
          IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
wmmhello's avatar
wmmhello 已提交
342
        taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
343
      }
344 345 346 347 348 349
    }
  }

  metaReaderClear(&mr);
}

350 351 352 353
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

354 355
  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
  colInfoDataEnsureCapacity(&infoData, 1);
356

357
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
358
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
359 360 361 362 363

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

364
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
365
  STableScanInfo* pTableScanInfo = pOperator->info;
366
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
367
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
368

369 370
  int64_t st = taosGetTimestampUs();

371
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
372 373
    if (isTaskKilled(pTaskInfo)) {
      longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
374
    }
H
Haojun Liao 已提交
375

376 377 378 379 380 381
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

382 383 384 385 386 387 388 389
    blockDataCleanup(pBlock);

    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.rows);
    pBlock->info = binfo;
L
Liu Jicong 已提交
390
    ASSERT(binfo.uid != 0);
H
Haojun Liao 已提交
391

392 393 394 395 396 397
    uint32_t status = 0;
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }
398

399 400 401
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
402
    }
403

404
    uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
wmmhello's avatar
wmmhello 已提交
405 406 407 408
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

409 410 411 412
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
413 414

    // todo refactor
L
Liu Jicong 已提交
415 416
    pTableScanInfo->lastStatus.uid = pBlock->info.uid;
    pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
417

L
Liu Jicong 已提交
418
    ASSERT(pBlock->info.uid != 0);
419
    return pBlock;
H
Haojun Liao 已提交
420 421 422 423
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
424
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
425 426 427 428
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
429
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
430 431 432
    return NULL;
  }

433 434
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
435 436 437
    while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
L
Liu Jicong 已提交
438
        ASSERT(p->info.uid != 0);
439 440 441 442
        return p;
      }
      pTableScanInfo->curTWinIdx += 1;
      if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
443
        tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
444
      }
H
Haojun Liao 已提交
445 446
    }

447
    pTableScanInfo->scanTimes += 1;
448

449
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
450 451
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
452 453 454
      qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
      for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
        STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
455
        qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
456
      }
457
      // do prepare for the next round table scan operation
458
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
459
      pTableScanInfo->curTWinIdx = 0;
H
Haojun Liao 已提交
460
    }
461
  }
H
Haojun Liao 已提交
462

463
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
464
  if (pTableScanInfo->scanTimes < total) {
465
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
466
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0);
467
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
468
      pTableScanInfo->curTWinIdx = 0;
469
    }
H
Haojun Liao 已提交
470

471 472 473
    qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
    for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
      STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
474
      qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
475
    }
476

477
    while (pTableScanInfo->scanTimes < total) {
478 479 480 481 482 483 484
      while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        SSDataBlock* p = doTableScanImpl(pOperator);
        if (p != NULL) {
          return p;
        }
        pTableScanInfo->curTWinIdx += 1;
        if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
485
          tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
486
        }
487
      }
H
Haojun Liao 已提交
488

489
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
490

491
      if (pTableScanInfo->scanTimes < total) {
492 493
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
494

495 496
        qDebug("%s start to repeat descending order scan data blocks due to query func required",
               GET_TASKID(pTaskInfo));
497 498
        for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
          STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
499
          qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
500
        }
501
        tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
502
        pTableScanInfo->curTWinIdx = 0;
503
      }
H
Haojun Liao 已提交
504 505 506
    }
  }

wmmhello's avatar
wmmhello 已提交
507 508 509 510 511 512 513
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

L
Liu Jicong 已提交
514 515
  // if scan table by table
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
L
Liu Jicong 已提交
516
    if (pInfo->noTable) return NULL;
L
Liu Jicong 已提交
517
    while (1) {
L
Liu Jicong 已提交
518 519 520 521 522 523 524 525 526 527 528
      SSDataBlock* result = doTableScanGroup(pOperator);
      if (result) {
        return result;
      }
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
      if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
        return NULL;
      }
      STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
      tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
529
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
L
Liu Jicong 已提交
530 531 532 533 534
      pInfo->scanTimes = 0;
      pInfo->curTWinIdx = 0;
    }
  }

535
  if (pInfo->currentGroupId == -1) {
wmmhello's avatar
wmmhello 已提交
536
    pInfo->currentGroupId++;
wmmhello's avatar
wmmhello 已提交
537
    if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
538
      setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
539 540
      return NULL;
    }
H
Haojun Liao 已提交
541

542
    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
H
Haojun Liao 已提交
543 544
    tsdbReaderClose(pInfo->dataReader);

545 546
    int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
                                  GET_TASKID(pTaskInfo));
wmmhello's avatar
wmmhello 已提交
547 548 549
  }

  SSDataBlock* result = doTableScanGroup(pOperator);
550
  if (result) {
wmmhello's avatar
wmmhello 已提交
551 552 553 554 555
    return result;
  }

  pInfo->currentGroupId++;
  if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
556
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
557 558 559
    return NULL;
  }

560
  SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
561
  //  tsdbSetTableList(pInfo->dataReader, tableList);
wmmhello's avatar
wmmhello 已提交
562

563
  tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
wmmhello's avatar
wmmhello 已提交
564 565 566 567
  pInfo->curTWinIdx = 0;
  pInfo->scanTimes = 0;

  result = doTableScanGroup(pOperator);
568
  if (result) {
wmmhello's avatar
wmmhello 已提交
569 570 571
    return result;
  }

572 573
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
574 575
}

576 577
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
578
  STableScanInfo*         pTableScanInfo = pOptr->info;
579 580 581 582 583 584
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

585 586
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
587
  blockDataDestroy(pTableScanInfo->pResBlock);
588
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
H
Haojun Liao 已提交
589

H
refact  
Hongze Cheng 已提交
590
  tsdbReaderClose(pTableScanInfo->dataReader);
591 592 593 594 595 596

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

wmmhello's avatar
wmmhello 已提交
597
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
598
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
599 600 601
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
602
    goto _error;
H
Haojun Liao 已提交
603 604
  }

605
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
606
  int32_t             numOfCols = 0;
607
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
608

609 610
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
611
    goto _error;
612 613 614
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
615
    SExprSupp* pSup = &pInfo->pseudoSup;
616 617
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
618 619
  }

620
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
621
  //    pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
622

623 624 625 626
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
627

628
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
629 630 631 632 633
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
  pInfo->curTWinIdx = 0;
wmmhello's avatar
wmmhello 已提交
634
  pInfo->currentGroupId = -1;
635 636

  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
637
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
638 639 640
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
641
  pOperator->exprSupp.numOfExprs = numOfCols;
642
  pOperator->pTaskInfo = pTaskInfo;
643

644 645
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
646 647 648

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
649
  return pOperator;
650

651
_error:
652 653 654 655 656
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);

  pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
657 658
}

659
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
660
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
661
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
662

L
Liu Jicong 已提交
663 664
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
665

L
Liu Jicong 已提交
666
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
667
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
L
Liu Jicong 已提交
668 669 670 671
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
672

673
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
674 675 676
  return pOperator;
}

677 678
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
  int32_t rowLen = 0;
H
Haojun Liao 已提交
679

680
  SMetaReader mr = {0};
681 682
  metaReaderInit(&mr, pMeta, 0);
  metaGetTableEntryByUid(&mr, uid);
683 684
  if (mr.me.type == TSDB_SUPER_TABLE) {
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
685
    for (int32_t i = 0; i < numOfCols; ++i) {
686 687 688 689 690 691 692
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    uint64_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;

693
    for (int32_t i = 0; i < numOfCols; ++i) {
694 695 696 697
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
698
    for (int32_t i = 0; i < numOfCols; ++i) {
699 700 701 702 703
      rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
    }
  }

  metaReaderClear(&mr);
704 705 706
  return rowLen;
}

707
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
708 709 710 711
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

712
  SBlockDistInfo* pBlockScanInfo = pOperator->info;
H
Haojun Liao 已提交
713

714 715
  STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
  blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid);
H
Haojun Liao 已提交
716

717 718
  tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
  blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
H
Haojun Liao 已提交
719

720
  SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
H
Haojun Liao 已提交
721

722
  int32_t          slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
723
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
724

725
  int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
726
  char*   p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
727 728 729
  tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
  varDataSetLen(p, len);

730
  blockDataEnsureCapacity(pBlock, 1);
731 732
  colDataAppend(pColInfo, 0, p, false);
  taosMemoryFree(p);
H
Haojun Liao 已提交
733

734 735
  pBlock->info.rows = 1;

H
Haojun Liao 已提交
736 737 738 739
  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

740
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
741
  SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
742 743 744
  blockDataDestroy(pDistInfo->pResBlock);
}

745 746
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
747
  SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
748
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
749 750 751 752 753
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

754
  pInfo->pHandle = dataReader;
755
  pInfo->readHandle = *readHandle;
756 757
  pInfo->uid = uid;
  pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
758

759
  int32_t    numOfCols = 0;
760
  SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
761
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
762 763 764
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
765

766
  pOperator->name = "DataBlockDistScanOperator";
767
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
768 769 770 771 772 773 774
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
                                         destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
775 776
  return pOperator;

777
_error:
H
Haojun Liao 已提交
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

H
Haojun Liao 已提交
794
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
795
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
796 797
}

H
Haojun Liao 已提交
798
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
799
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
800
}
5
54liuyao 已提交
801

802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
  uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
  if (groupId) {
    return *groupId;
  }
  return 0;
  /* Todo(liuyao) for partition by column
  recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
  int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
  uint64_t resId = 0;
  uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
  if (groupId) {
    return *groupId;
  } else if (len != 0) {
    resId = calcGroupId(pTableScanInfo->keyBuf, len);
    taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
  }
  return resId;
  */
}

823 824 825 826
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
  ASSERT(rowIndex < pBlock->info.rows);
  switch (pBlock->info.type)
  {
827
  case STREAM_DELETE_DATA:
828 829 830 831 832 833 834 835 836 837 838
  case STREAM_RETRIEVE: {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
    uint64_t* groupCol = (uint64_t*)pColInfo->pData;
    pInfo->groupId = groupCol[rowIndex];
  }
    break;
  default:
    break;
  }
}

5
54liuyao 已提交
839
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
840 841 842
  STimeWindow win = {
      .skey = INT64_MIN,
      .ekey = INT64_MAX,
843
  };
5
54liuyao 已提交
844
  bool needRead = false;
5
54liuyao 已提交
845 846
  if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
847 848
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo   dumyInfo;
5
54liuyao 已提交
849
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
850 851
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
852 853 854
      int64_t              gap = pInfo->sessionSup.gap;
      int32_t              winIndex = 0;
      SResultWindowInfo*   pCurWin =
855
          getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
5
54liuyao 已提交
856
      win = pCurWin->win;
857
      (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
5
54liuyao 已提交
858
    } else {
859
      win =
860 861 862
          getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
      setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
      (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
863
                                               TSDB_ORDER_ASC);
5
54liuyao 已提交
864
    }
5
54liuyao 已提交
865 866 867 868 869
    needRead = true;
  } else if (isStateWindow(pInfo)) {
    SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
    int32_t size = taosArrayGetSize(pWins);
    if (pInfo->scanWinIndex < size) {
870
      win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
5
54liuyao 已提交
871 872 873 874 875 876 877 878
      pInfo->scanWinIndex++;
      needRead = true;
    } else {
      pInfo->scanWinIndex = 0;
      taosArrayClear(pWins);
    }
  }
  if (!needRead) {
5
54liuyao 已提交
879 880
    return false;
  }
L
Liu Jicong 已提交
881
  STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
5
54liuyao 已提交
882 883
  pTableScanInfo->cond.twindows[0] = win;
  pTableScanInfo->curTWinIdx = 0;
884
  //  tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
5
54liuyao 已提交
885 886 887
  // if (!pTableScanInfo->dataReader) {
  //   return false;
  // }
5
54liuyao 已提交
888
  pTableScanInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
889
  pTableScanInfo->currentGroupId = -1;
5
54liuyao 已提交
890
  return true;
5
54liuyao 已提交
891 892
}

893
static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) {
894
  for (int32_t j = 0; j < taosArrayGetSize(source->pDataBlock); j++) {
895 896 897 898 899 900
    SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j);
    SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j);
    if (colDataIsNull_s(pSourceCol, sourceRowId)) {
      colDataAppendNULL(pDestCol, dest->info.rows);
    } else {
      colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false);
5
54liuyao 已提交
901 902
    }
  }
903
  dest->info.rows++;
5
54liuyao 已提交
904 905
}

5
54liuyao 已提交
906
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
907 908
  while (1) {
    SSDataBlock* pResult = NULL;
L
Liu Jicong 已提交
909
    pResult = doTableScan(pInfo->pSnapshotReadOp);
910
    if (pResult == NULL) {
5
54liuyao 已提交
911
      if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
912
        // scan next window data
L
Liu Jicong 已提交
913
        pResult = doTableScan(pInfo->pSnapshotReadOp);
914 915 916 917 918
      }
    }
    if (!pResult) {
      return NULL;
    }
H
Hongze Cheng 已提交
919

920 921
    if (pResult->info.groupId == pInfo->groupId) {
      return pResult;
5
54liuyao 已提交
922 923
    }
  }
924

H
Hongze Cheng 已提交
925 926 927 928
  /* Todo(liuyao) for partition by column
    SSDataBlock* pBlock = createOneDataBlock(pResult, true);
    blockDataCleanup(pResult);
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
929
      uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid);
H
Hongze Cheng 已提交
930 931 932
      if (id == pInfo->groupId) {
        copyOneRow(pResult, pBlock, i);
      }
933
    }
H
Hongze Cheng 已提交
934 935
    return pResult;
  */
936 937
}

938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
  if (pDelBlock->info.rows == 0) {
    return;
  }
  blockDataCleanup(pUpdateRes);
  blockDataEnsureCapacity(pUpdateRes, 64);
  ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY* startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY* endData = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t* uidCol = (uint64_t*)pGpCol->pData;

  SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
  for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
      i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
    uint64_t groupId = getGroupId(pOperator, uidCol[i]);
    for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) {
      colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
      colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
      pUpdateRes->info.rows++;
      startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision);
    }
    pInfo->deleteDataIndex++;
  }

  if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) {
    blockDataCleanup(pDelBlock);
    pInfo->deleteDataIndex = 0;
  }
}

972 973
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
  blockDataCleanup(pUpdateBlock);
5
54liuyao 已提交
974
  int32_t size = taosArrayGetSize(pInfo->tsArray);
975
  if (pInfo->tsArrayIndex < size) {
5
54liuyao 已提交
976
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
977
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
978
    blockDataEnsureCapacity(pUpdateBlock, size);
979 980

    int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
981
    pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
982
    int32_t i = 0;
H
Hongze Cheng 已提交
983
    for (; i < size; i++) {
984
      rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
985
      uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
986 987
      if (pInfo->groupId != id) {
        break;
988
      }
989
      copyOneRow(pUpdateBlock, pBlock, rowId);
990
    }
991 992 993
    pUpdateBlock->info.rows = i;
    pInfo->tsArrayIndex += i;
    pUpdateBlock->info.groupId = pInfo->groupId;
5
54liuyao 已提交
994
    pUpdateBlock->info.type = STREAM_CLEAR;
5
54liuyao 已提交
995
    blockDataUpdateTsWindow(pUpdateBlock, 0);
996 997 998 999
  }
  // all rows have same group id
  ASSERT(pInfo->tsArrayIndex >= size);
  if (size > 0 && pInfo->tsArrayIndex == size) {
5
54liuyao 已提交
1000
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
1001
  }
1002 1003 1004 1005

  if (size == 0) {
    copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
  }
5
54liuyao 已提交
1006 1007
}

1008 1009
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
    bool out) {
1010 1011 1012 1013
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY* ts = (TSKEY*)pColDataInfo->pData;
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
1014
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) {
1015 1016 1017
      taosArrayPush(pInfo->tsArray, &rowId);
    }
  }
1018 1019 1020 1021 1022 1023 1024 1025 1026
}

static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) {
  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
  uint64_t*        uidCol = (uint64_t*)pColDataInfo->pData;
  ASSERT(pBlock->info.rows > 0);
  for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
    uidCol[i] = getGroupId(pOperator, uidCol[i]);
1027 1028 1029
  }
}

1030
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1031 1032 1033 1034
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;

1035
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
1036
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
1037 1038 1039
    return NULL;
  }

5
54liuyao 已提交
1040
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
1041
  // TODO: refactor
L
Liu Jicong 已提交
1042
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1043
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1044
      /*doClearBufferedBlocks(pInfo);*/
1045
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
1046 1047 1048
      return NULL;
    }

1049
    int32_t      current = pInfo->validBlockIndex++;
1050 1051
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
    blockDataUpdateTsWindow(pBlock, 0);
1052 1053
    switch (pBlock->info.type) {
    case STREAM_RETRIEVE:{
L
Liu Jicong 已提交
1054
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
5
54liuyao 已提交
1055 1056 1057
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
      copyDataBlock(pInfo->pPullDataRes, pBlock);
      pInfo->pullDataResIndex = 0;
1058
      prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
5
54liuyao 已提交
1059
      updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
    }
      break;
    case STREAM_DELETE_DATA: {
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
      copyDataBlock(pInfo->pDeleteDataRes, pBlock);
      copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
      pInfo->updateResIndex = 0;
      prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
      return pInfo->pUpdateRes;
    }
      break;
    default:
      break;
5
54liuyao 已提交
1075
    }
1076
    return pBlock;
L
Liu Jicong 已提交
1077
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
5
54liuyao 已提交
1078 1079 1080 1081 1082 1083
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
1084
      if (!isStateWindow(pInfo)) {
5
54liuyao 已提交
1085
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
1086
      }
5
54liuyao 已提交
1087
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1088 1089 1090
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
      SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
      if (pSDB != NULL) {
1091
        checkUpdateData(pInfo, true, pSDB, false);
L
Liu Jicong 已提交
1092
        pSDB->info.type = STREAM_PULL_DATA;
5
54liuyao 已提交
1093 1094 1095
        return pSDB;
      }
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
1096 1097 1098 1099 1100 1101
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
      SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
      if (pSDB) {
        pSDB->info.type = STREAM_NORMAL;
        checkUpdateData(pInfo, true, pSDB, false);
        return pSDB;
5
54liuyao 已提交
1102
      }
1103 1104 1105 1106
      setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
      if (pInfo->pUpdateRes->info.rows > 0) {
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        return pInfo->pUpdateRes;
5
54liuyao 已提交
1107
      }
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
    } else if (isStateWindow(pInfo)) {
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
      pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
      if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
        ASSERT(pInfo->pUpdateRes->info.rows == 0);
        // return empty data blcok
        return pInfo->pUpdateRes;
      }
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1118
    }
5
54liuyao 已提交
1119

H
Haojun Liao 已提交
1120 1121 1122
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

1123
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
1124
      SSDataBlock block = {0};
1125

1126
      // todo refactor
1127
      int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);
1128

1129 1130 1131
      uint64_t groupId = block.info.groupId;
      uint64_t uid = block.info.uid;
      int32_t  numOfRows = block.info.rows;
H
Haojun Liao 已提交
1132

1133 1134 1135
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
1136 1137
      }

1138 1139
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
1140
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
1141
      pInfo->pRes->info.type = STREAM_NORMAL;
1142
      pInfo->pRes->info.capacity = numOfRows;
H
Haojun Liao 已提交
1143

1144 1145 1146 1147
      // for generating rollup SMA result, each time is an independent time serie.
      // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
      if (pInfo->assignBlockUid) {
        pInfo->pRes->info.groupId = uid;
1148 1149
      } else {
        pInfo->pRes->info.groupId = groupId;
1150 1151
      }

1152 1153 1154 1155 1156
      uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
      if (groupIdPre) {
        pInfo->pRes->info.groupId = *groupIdPre;
      }

1157
      // todo extract method
1158
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
1159
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
1160 1161 1162 1163
        if (!pColMatchInfo->output) {
          continue;
        }

1164
        bool colExists = false;
1165 1166
        for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
          SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
1179 1180
      }

H
Haojun Liao 已提交
1181
      taosArrayDestroy(block.pDataBlock);
H
Haojun Liao 已提交
1182 1183
      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
5
54liuyao 已提交
1184
        updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1185
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
1186 1187 1188
        pTaskInfo->code = terrno;
        return NULL;
      }
1189 1190 1191 1192 1193 1194

      // currently only the tbname pseudo column
      if (pInfo->numOfPseudoExpr > 0) {
        addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
      }

1195
      doFilter(pInfo->pCondition, pInfo->pRes);
1196
      blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1197 1198 1199
      if (pBlockInfo->rows > 0) {
        break;
      }
H
Haojun Liao 已提交
1200 1201 1202 1203
    }

    // record the scan action.
    pInfo->numOfExec++;
1204
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
1205

5
54liuyao 已提交
1206
    if (pBlockInfo->rows == 0) {
5
54liuyao 已提交
1207
      updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1208
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
1209
    } else if (pInfo->pUpdateInfo) {
1210
      pInfo->tsArrayIndex = 0;
1211 1212
      checkUpdateData(pInfo, true, pInfo->pRes, true);
      setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
5
54liuyao 已提交
1213
      if (pInfo->pUpdateRes->info.rows > 0) {
5
54liuyao 已提交
1214
        if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
1215 1216
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
5
54liuyao 已提交
1217
        } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
1218
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
5
54liuyao 已提交
1219
          return pInfo->pUpdateRes;
5
54liuyao 已提交
1220
        }
5
54liuyao 已提交
1221
      }
1222
    }
5
54liuyao 已提交
1223
    return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
L
Liu Jicong 已提交
1224 1225 1226
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
    // check reader last status
    // if not match, reset status
L
Liu Jicong 已提交
1227
    SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
L
Liu Jicong 已提交
1228 1229
    return pResult && pResult->info.rows > 0 ? pResult : NULL;

L
Liu Jicong 已提交
1230 1231 1232
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1233 1234 1235
  }
}

1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
  for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

1248 1249 1250
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
                                            SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
                                            uint64_t taskId) {
H
Haojun Liao 已提交
1251 1252
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1253

H
Haojun Liao 已提交
1254 1255
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
1256
    goto _error;
H
Haojun Liao 已提交
1257 1258
  }

1259 1260 1261
  SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;

  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
1262

1263
  int32_t numOfCols = 0;
1264
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
1265 1266 1267

  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
1268
  for (int32_t i = 0; i < numOfOutput; ++i) {
1269 1270 1271
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
1272
    taosArrayPush(pColIds, &colId);
1273
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
5
54liuyao 已提交
1274 1275
      pInfo->primaryTsIndex = id->targetSlotId;
    }
H
Haojun Liao 已提交
1276 1277 1278 1279
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
1280 1281
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1282 1283
  }

1284
  pInfo->tsArray = taosArrayInit(4, sizeof(int32_t));
5
54liuyao 已提交
1285
  if (pInfo->tsArray == NULL) {
1286
    goto _error;
5
54liuyao 已提交
1287 1288
  }

L
Liu Jicong 已提交
1289
  if (pHandle) {
H
Haojun Liao 已提交
1290
    SOperatorInfo*  pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
1291
    STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
L
Liu Jicong 已提交
1292 1293 1294 1295

    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
    if (pHandle->tqReader) {
      pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Hongze Cheng 已提交
1296
      tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, 0);
L
Liu Jicong 已提交
1297 1298
    }

L
Liu Jicong 已提交
1299 1300 1301 1302 1303
    if (pSTInfo->interval.interval > 0) {
      pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
    } else {
      pInfo->pUpdateInfo = NULL;
    }
L
Liu Jicong 已提交
1304
    pInfo->pSnapshotReadOp = pTableScanDummy;
L
Liu Jicong 已提交
1305 1306
    pInfo->interval = pSTInfo->interval;

L
Liu Jicong 已提交
1307 1308 1309 1310 1311
    pInfo->readHandle = *pHandle;
    ASSERT(pHandle->reader);
    pInfo->streamBlockReader = pHandle->reader;
    pInfo->tableUid = pScanPhyNode->uid;

L
Liu Jicong 已提交
1312
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
1313
    tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
L
Liu Jicong 已提交
1314 1315 1316 1317 1318 1319 1320
    SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
    int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
5
54liuyao 已提交
1321 1322
  }

1323 1324 1325 1326 1327
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

1328
  pInfo->pRes = createResDataBlock(pDescNode);
X
Xiaoyu Wang 已提交
1329
  pInfo->pUpdateRes = createResDataBlock(pDescNode);
1330 1331 1332
  pInfo->pCondition = pScanPhyNode->node.pConditions;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
1333
  pInfo->groupId = 0;
5
54liuyao 已提交
1334
  pInfo->pPullDataRes = createPullDataBlock();
1335
  pInfo->pStreamScanOp = pOperator;
1336 1337
  pInfo->deleteDataIndex = 0;
  pInfo->pDeleteDataRes = createPullDataBlock();
H
Hongze Cheng 已提交
1338

1339
  pOperator->name = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1340
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1341 1342 1343
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1344
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
1345
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1346

1347 1348
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1349

H
Haojun Liao 已提交
1350
  return pOperator;
1351

L
Liu Jicong 已提交
1352
_error:
1353 1354 1355
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1356 1357 1358 1359 1360 1361 1362
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1363
  const char* name = tNameGetTableName(&pInfo->name);
1364
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1365
    metaCloseTbCursor(pInfo->pCur);
1366
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1367
  }
H
Haojun Liao 已提交
1368 1369

  taosArrayDestroy(pInfo->scanCols);
1370
  taosMemoryFreeClear(pInfo->pUser);
H
Haojun Liao 已提交
1371 1372
}

X
Xiaoyu Wang 已提交
1373
static int32_t getSysTableDbNameColId(const char* pTable) {
X
Xiaoyu Wang 已提交
1374 1375 1376
  // if (0 == strcmp(TSDB_INS_TABLE_USER_INDEXES, pTable)) {
  //   return 1;
  // }
X
Xiaoyu Wang 已提交
1377 1378 1379
  return TSDB_INS_USER_STABLES_DBNAME_COLID;
}

H
Haojun Liao 已提交
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
1401
      if (getSysTableDbNameColId(node->tableName) == node->colId) {
H
Haojun Liao 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1418
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1419 1420 1421 1422 1423 1424 1425
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1426
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1427 1428 1429
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1430
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1431 1432
}

D
dapan1121 已提交
1433
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1434 1435 1436 1437 1438 1439 1440
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1441 1442 1443
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1444 1445 1446 1447 1448
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1449
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1450 1451 1452 1453 1454 1455 1456
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

1457 1458
  doFilter(pInfo->pCondition, pInfo->pRes);
#if 0
H
Haojun Liao 已提交
1459
  SFilterInfo* filter = NULL;
1460 1461

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1462 1463 1464 1465 1466

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1467
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1468
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1469

1470
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1471 1472 1473 1474 1475 1476 1477 1478
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1479
    if (keep) {
1480
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info);
D
dapan1121 已提交
1481 1482 1483 1484 1485 1486 1487
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1488

1489 1490 1491 1492 1493 1494
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1495
        numOfRow += 1;
H
Haojun Liao 已提交
1496
      }
D
dapan1121 已提交
1497 1498
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1499 1500 1501 1502 1503
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;
1504
#endif
H
Haojun Liao 已提交
1505 1506 1507 1508

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1509
static SSDataBlock* buildSysTableMetaBlock() {
L
Liu Jicong 已提交
1510 1511
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1512 1513 1514
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1515 1516
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1517 1518 1519 1520
      index = i;
      break;
    }
  }
1521

1522
  SSDataBlock* pBlock = createDataBlock();
L
Liu Jicong 已提交
1523
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
L
Liu Jicong 已提交
1524 1525
    SColumnInfoData colInfoData =
        createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
1526
    blockDataAppendColInfo(pBlock, &colInfoData);
1527 1528
  }

1529 1530 1531
  return pBlock;
}

1532
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1533 1534 1535 1536 1537
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1538 1539
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1540 1541 1542 1543
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1544 1545
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
1546
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1547

1548 1549
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1550

1551
      doSetOperatorCompleted(pOperator);
1552 1553 1554 1555 1556
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1557

1558 1559
      blockDataCleanup(pInfo->pRes);
      int32_t numOfRows = 0;
1560

1561 1562 1563
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1564

1565 1566 1567
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1568

1569 1570
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1571

1572
      SSDataBlock* p = buildSysTableMetaBlock();
1573
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1574

1575
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1576 1577 1578

      int32_t ret = 0;
      while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
1579
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1580

1581 1582 1583
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1584

1585 1586 1587
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1588

1589 1590 1591
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1592

1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1606
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
1607 1608

          // super table name
wmmhello's avatar
wmmhello 已提交
1609
          STR_TO_VARSTR(n, mr.me.name);
1610
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
wmmhello's avatar
wmmhello 已提交
1611
          colDataAppend(pColInfoData, numOfRows, n, false);
1612 1613
          metaReaderClear(&mr);

wmmhello's avatar
wmmhello 已提交
1614 1615
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1616
          if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1617 1618 1619
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1620
          } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1621 1622 1623
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1624
          } else {
wmmhello's avatar
wmmhello 已提交
1625 1626 1627
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1628 1629 1630 1631 1632 1633 1634 1635
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1636
          STR_TO_VARSTR(n, "CHILD_TABLE");
1637 1638 1639 1640 1641 1642 1643
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1644
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
1645 1646 1647 1648 1649

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

wmmhello's avatar
wmmhello 已提交
1650 1651
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1652
          if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1653 1654 1655
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1656
          } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1657 1658 1659
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1660
          } else {
wmmhello's avatar
wmmhello 已提交
1661 1662 1663
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1664 1665 1666 1667 1668 1669 1670 1671
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1672
          STR_TO_VARSTR(n, "NORMAL_TABLE");
1673
        }
1674

1675
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
wmmhello's avatar
wmmhello 已提交
1676
        colDataAppend(pColInfoData, numOfRows, n, false);
1677

1678
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1679 1680
          break;
        }
H
Haojun Liao 已提交
1681 1682
      }

1683 1684 1685 1686 1687 1688 1689
      // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
      if (ret != 0) {
        metaCloseTbCursor(pInfo->pCur);
        pInfo->pCur = NULL;
        doSetOperatorCompleted(pOperator);
      }

1690 1691
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1692

1693
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1694
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1695

1696 1697
      blockDataDestroy(p);

1698 1699 1700
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1701 1702 1703 1704 1705
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1706 1707 1708
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
1709
      strcpy(pInfo->req.user, pInfo->pUser);
H
Haojun Liao 已提交
1710

1711 1712 1713 1714 1715
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1716

1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1728

L
Liu Jicong 已提交
1729 1730
      int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
                                                                                : TDMT_MND_SYSTABLE_RETRIEVE;
D
dapan1121 已提交
1731

1732 1733 1734
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
D
dapan1121 已提交
1735
      pMsgSendInfo->msgType = msgType;
1736
      pMsgSendInfo->fp = loadSysTableCallback;
D
dapan1121 已提交
1737
      pMsgSendInfo->requestId = pTaskInfo->id.queryId;
H
Haojun Liao 已提交
1738

1739
      int64_t transporterId = 0;
1740 1741
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1742
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1743

1744 1745 1746 1747 1748
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1749

1750 1751
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1752

1753 1754
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
1755
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
1756
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1757

1758
        if (pRsp->numOfRows == 0) {
H
Haojun Liao 已提交
1759
          taosMemoryFree(pRsp);
1760 1761 1762
          return NULL;
        }
      }
H
Haojun Liao 已提交
1763

1764
      extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
L
Liu Jicong 已提交
1765
                                   pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1766

1767 1768
      // todo log the filter info
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1769
      taosMemoryFree(pRsp);
1770 1771
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
D
dapan1121 已提交
1772 1773
      } else if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
1774
      }
1775
    }
H
Haojun Liao 已提交
1776 1777 1778
  }
}

1779
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1780
  SSDataBlock* p = buildSysTableMetaBlock();
1781
  blockDataEnsureCapacity(p, capacity);
1782

L
Liu Jicong 已提交
1783
  size_t               size = 0;
1784 1785 1786 1787 1788 1789 1790 1791 1792
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

  pInfo->pRes->info.rows = p->info.rows;
1793
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1794 1795 1796
  blockDataDestroy(p);

  return pInfo->pRes->info.rows;
1797 1798
}

L
Liu Jicong 已提交
1799 1800 1801
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1802 1803
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1804
  for (int32_t i = 0; i < size; ++i) {
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1825
    for (int32_t j = 4; j <= 8; ++j) {
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1841
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
1842
                                              const char* pUser, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1843 1844 1845
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
1846
    goto _error;
H
Haojun Liao 已提交
1847 1848
  }

1849 1850 1851
  SScanPhysiNode* pScanNode = &pScanPhyNode->scan;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
1852
  SSDataBlock*        pResBlock = createResDataBlock(pDescNode);
1853 1854

  int32_t num = 0;
1855
  SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
1856

1857
  pInfo->accountId = pScanPhyNode->accountId;
1858
  pInfo->pUser = taosMemoryStrDup((void*)pUser);
1859
  pInfo->showRewrite = pScanPhyNode->showRewrite;
1860 1861 1862
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pScanNode->node.pConditions;
  pInfo->scanCols = colList;
1863 1864

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1865

1866
  tNameAssign(&pInfo->name, &pScanNode->tableName);
1867
  const char* name = tNameGetTableName(&pInfo->name);
1868

1869
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1870
    pInfo->readHandle = *(SReadHandle*)readHandle;
1871
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1872 1873
  } else {
    tsem_init(&pInfo->ready, 0, 0);
1874
    pInfo->epSet = pScanPhyNode->mgmtEpSet;
1875
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1876 1877
  }

1878
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1879
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
1880 1881 1882
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1883
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
1884
  pOperator->pTaskInfo = pTaskInfo;
1885

L
Liu Jicong 已提交
1886 1887
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
H
Haojun Liao 已提交
1888 1889

  return pOperator;
1890

1891
_error:
1892 1893 1894 1895
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
1896
}
H
Haojun Liao 已提交
1897

1898
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1899 1900 1901 1902
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1903 1904 1905
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1906 1907 1908 1909 1910 1911 1912 1913
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

1914
  int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
H
Haojun Liao 已提交
1915 1916 1917
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

1918
    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
H
Haojun Liao 已提交
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1978
#endif
H
Haojun Liao 已提交
1979

1980
  STagScanInfo* pInfo = pOperator->info;
1981
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
1982
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1983

wmmhello's avatar
wmmhello 已提交
1984 1985
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
1986 1987 1988 1989
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1990 1991 1992
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1993
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1994

wmmhello's avatar
wmmhello 已提交
1995 1996
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
1997
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1998

1999
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2000 2001 2002 2003 2004 2005
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
2006
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2007 2008 2009 2010
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
        const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);

2011 2012 2013 2014
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2015 2016
          data = (char*)p;
        }
L
Liu Jicong 已提交
2017 2018
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2019

2020 2021
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2022
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2023
        }
H
Haojun Liao 已提交
2024 2025 2026
      }
    }

2027
    count += 1;
wmmhello's avatar
wmmhello 已提交
2028
    if (++pInfo->curPos >= size) {
2029
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2030 2031 2032
    }
  }

2033 2034
  metaReaderClear(&mr);

2035
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2036
  if (pOperator->status == OP_EXEC_DONE) {
2037
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2038 2039 2040
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2041
  pOperator->resultInfo.totalRows += count;
2042

2043
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2044 2045 2046 2047 2048 2049 2050
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

2051 2052
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2053
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2054 2055 2056 2057 2058
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2059 2060
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

2061
  int32_t    num = 0;
2062 2063
  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
L
Liu Jicong 已提交
2064
  SArray*    colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
2065

2066 2067 2068 2069
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2070

2071 2072 2073 2074 2075
  pInfo->pTableList = pTableListInfo;
  pInfo->pColMatchInfo = colList;
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2076

2077
  pOperator->name = "TagScanOperator";
2078
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
2079

2080 2081 2082 2083
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
2084

2085 2086 2087
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

2088 2089
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
2090 2091

  return pOperator;
2092

2093
_error:
H
Haojun Liao 已提交
2094 2095 2096 2097 2098
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2099 2100

typedef struct STableMergeScanInfo {
2101
  STableListInfo* tableListInfo;
S
slzhou 已提交
2102 2103 2104 2105
  int32_t         tableStartIndex;
  int32_t         tableEndIndex;
  bool            hasGroupId;
  uint64_t        groupId;
2106

2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
  SArray*     dataReaders;  // array of tsdbReaderT*
  SReadHandle readHandle;

  int32_t  bufPageSize;
  uint32_t sortBufSize;  // max buffer size for in-memory sort

  SArray*      pSortInfo;
  SSortHandle* pSortHandle;

  SSDataBlock* pSortInputBlock;
  int64_t      startTs;  // sort start time

2119 2120 2121
  SArray*  sortSourceParams;
  uint64_t queryId;
  uint64_t taskId;
2122 2123 2124 2125 2126 2127 2128 2129 2130

  SFileBlockLoadRecorder readRecorder;
  int64_t                numOfRows;
  //  int32_t         prevGroupId;  // previous table group id
  SScanInfo       scanInfo;
  int32_t         scanTimes;
  SNode*          pFilterNode;  // filter info, which is push down by optimizer
  SqlFunctionCtx* pCtx;         // which belongs to the direct upstream operator operator query context
  SResultRowInfo* pResultRowInfo;
2131
  int32_t*        rowEntryInfoOffset;
2132 2133 2134 2135 2136 2137 2138 2139
  SExprInfo*      pExpr;
  SSDataBlock*    pResBlock;
  SArray*         pColMatchInfo;
  int32_t         numOfOutput;

  SExprInfo*      pPseudoExpr;
  int32_t         numOfPseudoExpr;
  SqlFunctionCtx* pPseudoCtx;
2140
  //  int32_t*        rowEntryInfoOffset;
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150

  SQueryTableDataCond cond;
  int32_t             scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t             dataBlockLoadFlag;
  SInterval interval;  // if the upstream is an interval operator, the interval info is also kept here to get the time
                       // window to check if current data block needs to be loaded.

  SSampleExecInfo sample;  // sample execution info
} STableMergeScanInfo;

2151
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
2152
                                STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
2153
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
2154
  if (code != TSDB_CODE_SUCCESS) {
2155
    return code;
2156 2157 2158 2159
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
2160
    return TSDB_CODE_SUCCESS;
2161
  }
wmmhello's avatar
wmmhello 已提交
2162
  pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort;
2163
  code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
2164
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2165
    return code;
2166
  }
2167

2168 2169 2170
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2171
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
H
Haojun Liao 已提交
2172
                                  int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
S
slzhou 已提交
2173 2174 2175 2176
  for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
    SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));

H
Haojun Liao 已提交
2177 2178
    STsdbReader* pReader = NULL;
    tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr);
2179
    taosArrayPush(arrayReader, &pReader);
2180

S
slzhou 已提交
2181
    taosArrayDestroy(subTableList);
2182 2183
  }

S
slzhou 已提交
2184
  return TSDB_CODE_SUCCESS;
2185 2186
}

H
Haojun Liao 已提交
2187
// todo refactor
2188 2189
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
                                         int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
H
Hongze Cheng 已提交
2190
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
S
shenglian zhou 已提交
2191
  STableMergeScanInfo* pInfo = pOperator->info;
2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217

  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;

  pCost->totalBlocks += 1;
  pCost->totalRows += pBlock->info.rows;

  *status = pInfo->dataBlockLoadFlag;
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
  taosMemoryFreeClear(pBlock->pBlockAgg);

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

    // clear all data in pBlock that are set when handing the previous block
2218
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

    bool             allColumnsHaveAgg = true;
    SColumnDataAgg** pColAgg = NULL;
H
Hongze Cheng 已提交
2229
    STsdbReader*     reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
2230 2231 2232
    tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg);

    if (allColumnsHaveAgg == true) {
2233
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269

      // todo create this buffer during creating operator
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }

      return TSDB_CODE_SUCCESS;
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
    }
  }

  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);

  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif

  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;

H
Hongze Cheng 已提交
2270
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
2271 2272 2273 2274 2275
  SArray*      pCols = tsdbRetrieveDataBlock(reader, NULL);
  if (pCols == NULL) {
    return terrno;
  }

2276
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
                           pBlock);
  }

  int64_t st = taosGetTimestampMs();
  doFilter(pTableScanInfo->pFilterNode, pBlock);

  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
  }

  return TSDB_CODE_SUCCESS;
}

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
2302
  SSDataBlock*   inputBlock;
2303 2304 2305 2306 2307 2308
} STableMergeScanSortSourceParam;

static SSDataBlock* getTableDataBlock(void* param) {
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  int32_t                         readerIdx = source->readerIdx;
2309
  SSDataBlock*                    pBlock = source->inputBlock;
2310 2311 2312 2313
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  int64_t st = taosGetTimestampUs();

2314 2315
  blockDataCleanup(pBlock);

H
Hongze Cheng 已提交
2316
  STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
  while (tsdbNextDataBlock(reader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

2328 2329 2330 2331 2332 2333 2334
    blockDataCleanup(pBlock);
    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(reader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.capacity);
    pBlock->info = binfo;
2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372

    uint32_t status = 0;
    int32_t  code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

    uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    return pBlock;
  }
  return NULL;
}

SArray* generateSortByTsInfo(int32_t order) {
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
  bi.slotId = 0;
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

2373
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2374 2375 2376
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2377 2378 2379 2380 2381 2382 2383 2384 2385 2386
  {
    size_t  tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
    int32_t i = pInfo->tableStartIndex + 1;
    for (; i < tableListSize; ++i) {
      STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
2387 2388
  }

S
slzhou 已提交
2389 2390
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2391

S
slzhou 已提交
2392 2393
  STableListInfo* tableListInfo = pInfo->tableListInfo;
  createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
2394
                            pInfo->dataReaders, GET_TASKID(pTaskInfo));
2395

2396 2397
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2398
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2399
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2400 2401
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2402 2403 2404 2405

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);

  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
2406 2407 2408 2409 2410 2411 2412 2413
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
  }

2414 2415
  for (int32_t i = 0; i < numReaders; ++i) {
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2416
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2417 2418 2419 2420 2421 2422 2423 2424 2425 2426
    ps->param = param;
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, terrno);
  }

2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  tsortDestroySortHandle(pInfo->pSortHandle);
  taosArrayClear(pInfo->sortSourceParams);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) {
H
Haojun Liao 已提交
2438 2439
    STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i);
    tsdbReaderClose(reader);
2440
  }
2441

2442 2443
  taosArrayDestroy(pInfo->dataReaders);
  pInfo->dataReaders = NULL;
2444 2445 2446
  return TSDB_CODE_SUCCESS;
}

2447
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) {
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2459
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2460 2461 2462 2463
    if (pTupleHandle == NULL) {
      break;
    }

2464
    appendOneRowToDataBlock(p, pTupleHandle);
2465 2466 2467 2468 2469
    if (p->info.rows >= capacity) {
      break;
    }
  }

2470 2471
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
  return (p->info.rows > 0) ? p : NULL;
2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
S
slzhou 已提交
2486 2487 2488
  size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2489

S
slzhou 已提交
2490
    if (tableListSize == 0) {
2491 2492 2493
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
S
slzhou 已提交
2494 2495
    pInfo->tableStartIndex = 0;
    pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
2496 2497
    startGroupTableMergeScan(pOperator);
  }
S
slzhou 已提交
2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
        doSetOperatorCompleted(pOperator);
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
      pInfo->groupId =
          ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
      startGroupTableMergeScan(pOperator);
    }
2516
  }
wmmhello's avatar
wmmhello 已提交
2517

2518 2519 2520 2521 2522
  return pBlock;
}

void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
2523
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
2524 2525

  for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
H
Hongze Cheng 已提交
2526
    STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
H
refact  
Hongze Cheng 已提交
2527
    tsdbReaderClose(reader);
2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540
  }
  taosArrayDestroy(pTableScanInfo->dataReaders);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
}

2541 2542
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
H
Hongze Cheng 已提交
2543
  SSortExecInfo          sortExecInfo;
2544 2545
} STableMergeScanExecInfo;

2546 2547
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2548 2549
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
H
Hongze Cheng 已提交
2550
  STableMergeScanInfo*     pInfo = pOptr->info;
2551 2552 2553 2554 2555
  execInfo->blockRecorder = pInfo->readRecorder;
  execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
H
Hongze Cheng 已提交
2556

2557 2558 2559
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2560 2561 2562 2563 2564 2565
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
  const STableKeyInfo* info1 = p1;
  const STableKeyInfo* info2 = p2;
  return info1->groupId - info2->groupId;
}

2566 2567 2568
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
                                                uint64_t taskId) {
2569 2570 2571 2572 2573
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2574
  if (pTableScanNode->pGroupTags) {
S
slzhou 已提交
2575 2576
    taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
  }
2577 2578 2579 2580

  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
L
Liu Jicong 已提交
2581
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
2582 2583 2584 2585 2586 2587 2588 2589

  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
2590
    pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset);
2591 2592 2593 2594
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

L
Liu Jicong 已提交
2595 2596
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
2597
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2598 2599 2600
  pInfo->sample.seed = taosGetTimestampSec();
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
2601
  pInfo->tableListInfo = pTableListInfo;
L
Liu Jicong 已提交
2602 2603
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
2604 2605

  pInfo->pResBlock = createResDataBlock(pDescNode);
2606 2607 2608
  pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
  pInfo->queryId = queryId;
  pInfo->taskId = taskId;
2609

2610
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2611

2612 2613
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2614

2615
  int32_t rowSize = pInfo->pResBlock->info.rowSize;
L
Liu Jicong 已提交
2616
  pInfo->bufPageSize = getProperSortPageSize(rowSize);
2617

L
Liu Jicong 已提交
2618
  pOperator->name = "TableMergeScanOperator";
2619
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
L
Liu Jicong 已提交
2620 2621 2622
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2623
  pOperator->exprSupp.numOfExprs = numOfCols;
L
Liu Jicong 已提交
2624
  pOperator->pTaskInfo = pTaskInfo;
2625 2626 2627
  initResultSizeInfo(pOperator, 1024);

  pOperator->fpSet =
2628 2629
      createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
                          NULL, getTableMergeScanExplainExecInfo);
2630 2631 2632 2633 2634 2635 2636 2637 2638
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
2639 2640 2641 2642 2643 2644 2645

static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SLastrowScanInfo* pInfo = pOperator->info;
2646
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2647

H
Haojun Liao 已提交
2648
  int32_t size = taosArrayGetSize(pInfo->pTableList);
2649 2650 2651 2652 2653 2654
  if (size == 0) {
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

  // check if it is a group by tbname
H
Haojun Liao 已提交
2655
  if (size == taosArrayGetSize(pInfo->pTableList)) {
2656 2657
    blockDataCleanup(pInfo->pRes);
    tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
2658
    return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
2659
  } else {
2660
    // todo fetch the result for each group
2661 2662
  }

2663
  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
2664 2665 2666
}

static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
2667
  SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
2668
  blockDataDestroy(pInfo->pRes);
2669
  tsdbLastrowReaderClose(pInfo->pLastrowReader);
2670 2671
}

2672 2673
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
                                         SExecTaskInfo* pTaskInfo) {
2674 2675 2676 2677 2678 2679 2680 2681
  SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

  pInfo->pTableList = pTableList;
  pInfo->readHandle = *readHandle;
2682 2683 2684
  pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);

  int32_t numOfCols = 0;
2685 2686
  pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
                                             COL_MATCH_FROM_COL_ID);
2687
  int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
2688
  for (int32_t i = 0; i < numOfCols; ++i) {
2689 2690 2691 2692 2693
    SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
    pCols[i] = pColMatch->colId;
  }

  pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0]));
2694
  for (int32_t i = 0; i < numOfCols; ++i) {
2695
    SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
2696 2697 2698
    for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
      if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId &&
          pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709
        pInfo->pSlotIds[pColMatch->targetSlotId] = -1;
        break;
      }

      if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
        pInfo->pSlotIds[pColMatch->targetSlotId] = j;
        break;
      }
    }
  }

2710 2711
  tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols,
                        &pInfo->pLastrowReader);
2712
  taosMemoryFree(pCols);
2713

2714
  pOperator->name = "LastrowScanOperator";
H
Haojun Liao 已提交
2715
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
2716 2717 2718 2719
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
2720
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
2721 2722

  initResultSizeInfo(pOperator, 1024);
2723
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
2724 2725

  pOperator->fpSet =
2726
      createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
2727 2728 2729 2730 2731 2732 2733 2734 2735
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}