scanoperator.c 59.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19 20
#include "os.h"
#include "querynodes.h"
21
#include "systable.h"
22
#include "tglobal.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
34
#include "vnode.h"
H
Haojun Liao 已提交
35

wmmhello's avatar
wmmhello 已提交
36 37
#include "executorInt.h"

H
Haojun Liao 已提交
38
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
39
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
40

5
54liuyao 已提交
41 42 43 44 45
typedef struct SWindowPosition {
  int32_t pageId;
  int32_t rowId;
} SWindowPosition;

46
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
47 48
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
49

50
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

79 80 81 82 83 84 85 86 87
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
88
  // convert key to second
89 90 91 92 93 94 95
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
96
  time_t    t = (time_t)key;
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  tw->ekey -= 1;
}

112
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
113 114 115 116 117 118 119
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

120
  if (order == TSDB_ORDER_ASC) {
121
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
122 123 124 125 126 127
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

128 129
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
130 131 132 133 134 135 136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
140 141 142 143 144 145 146
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

147
    while (1) {
148 149 150 151 152 153 154 155 156 157
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
158 159 160 161 162
  }

  return false;
}

163 164
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock);

L
Liu Jicong 已提交
165 166
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
167
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
168 169
  STableScanInfo* pInfo = pOperator->info;

170
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
171 172

  pCost->totalBlocks += 1;
173
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
174

175
  *status = pInfo->dataBlockLoadFlag;
176 177
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
178 179 180 181
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
182
  taosMemoryFreeClear(pBlock->pBlockAgg);
183 184

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
185 186
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
187 188 189
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
190 191
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
192
    pCost->skipBlocks += 1;
193 194

    // clear all data in pBlock that are set when handing the previous block
195
    for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
196 197 198 199
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

200 201 202 203
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
204
    bool             allColumnsHaveAgg = true;
205
    SColumnDataAgg** pColAgg = NULL;
206
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
207

208
    if (allColumnsHaveAgg == true) {
209 210 211
      int32_t numOfCols = pBlock->info.numOfCols;

      // todo create this buffer during creating operator
212 213 214 215
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

216 217 218 219 220 221 222
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
223

224
      return TSDB_CODE_SUCCESS;
225
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
226
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
227
    }
H
Haojun Liao 已提交
228
  }
229

230
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
231

H
Haojun Liao 已提交
232 233 234 235 236 237 238 239 240 241
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
242

H
Haojun Liao 已提交
243 244
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
245

H
Haojun Liao 已提交
246 247 248
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
249 250
  }

H
Haojun Liao 已提交
251
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
252 253 254 255 256 257

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
    addTagPseudoColumnData(pTableScanInfo, pBlock);
  }

258
  int64_t st = taosGetTimestampMs();
259 260
  doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);

261 262 263
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

264 265
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
266 267
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
268 269
  }

H
Haojun Liao 已提交
270 271 272
  return TSDB_CODE_SUCCESS;
}

273
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
274 275 276
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
277
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
278

279
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindow;
wafwerar's avatar
wafwerar 已提交
280
  TSWAP(pTWindow->skey, pTWindow->ekey);
281
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
282 283
}

284
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr == 0) {
    return;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

  for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
    colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
306
      setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
307
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
308
      const char* p = NULL;
309 310 311 312 313 314
      if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
        const uint8_t* tmp = mr.me.ctbEntry.pTags;

        char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
        if (data == NULL) {
          metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
315 316 317
          qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
          return;
        }
318

wmmhello's avatar
wmmhello 已提交
319
        *data = TSDB_DATA_TYPE_JSON;
320
        memcpy(data + 1, tmp, kvRowLen(tmp));
wmmhello's avatar
wmmhello 已提交
321
        p = data;
322
      } else {
wmmhello's avatar
wmmhello 已提交
323 324
        p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
      }
325

326 327 328
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
        colDataAppend(pColInfoData, i, p, (p == NULL));
      }
329
      if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
330 331
        taosMemoryFree((void*)p);
      }
332 333 334 335 336 337
    }
  }

  metaReaderClear(&mr);
}

338 339 340 341 342 343 344 345 346
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_BIGINT;
  infoData.info.bytes = sizeof(uint64_t);
  colInfoDataEnsureCapacity(&infoData, 0, 1);

347
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
348
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
349 350 351 352 353

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

354
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
355
  STableScanInfo* pTableScanInfo = pOperator->info;
L
Liu Jicong 已提交
356
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
357

358 359
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
360 361 362 363 364 365 366
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);

367
    uint32_t status = 0;
H
Haojun Liao 已提交
368
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
H
Haojun Liao 已提交
369 370 371 372 373
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

wmmhello's avatar
wmmhello 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386
    int32_t numOfGroupCols = taosArrayGetSize(pTableScanInfo->pGroupCols);
    recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0, numOfGroupCols);
    int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);

    uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
    if (!groupId) {
      pBlock->info.groupId = *groupId;

    }else{
      pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
      taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
    }

387 388
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
H
Haojun Liao 已提交
389 390 391
      continue;
    }

392
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
393
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
394 395

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
H
Haojun Liao 已提交
396 397 398 399 400 401
    return pBlock;
  }

  return NULL;
}

402
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
403 404 405 406
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
407
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
408 409 410
    return NULL;
  }

411 412
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
413
    SSDataBlock* p = doTableScanImpl(pOperator);
H
Haojun Liao 已提交
414 415 416 417
    if (p != NULL) {
      return p;
    }

418
    pTableScanInfo->scanTimes += 1;
419

420
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
421 422 423 424
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;

      STimeWindow* pWin = &pTableScanInfo->cond.twindow;
425
      qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
426 427
             "-%" PRId64,
             GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
428 429 430

      // do prepare for the next round table scan operation
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
H
Haojun Liao 已提交
431
    }
432
  }
H
Haojun Liao 已提交
433

434
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
435
  if (pTableScanInfo->scanTimes < total) {
436 437 438 439
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
    }
H
Haojun Liao 已提交
440

441 442 443
    STimeWindow* pWin = &pTableScanInfo->cond.twindow;
    qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
           GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
H
Haojun Liao 已提交
444

445
    while (pTableScanInfo->scanTimes < total) {
446
      SSDataBlock* p = doTableScanImpl(pOperator);
447 448 449
      if (p != NULL) {
        return p;
      }
H
Haojun Liao 已提交
450

451
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
452

453
      if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
454 455
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
456

457
        qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
L
Liu Jicong 已提交
458 459
               "-%" PRId64,
               GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
H
Haojun Liao 已提交
460

461 462 463
        // do prepare for the next round table scan operation
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
      }
H
Haojun Liao 已提交
464 465 466
    }
  }

467 468
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
469 470
}

471 472 473 474 475 476 477 478 479 480 481 482
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
  SInterval interval = {
      .interval = pTableScanNode->interval,
      .sliding = pTableScanNode->sliding,
      .intervalUnit = pTableScanNode->intervalUnit,
      .slidingUnit = pTableScanNode->slidingUnit,
      .offset = pTableScanNode->offset,
  };

  return interval;
}

483 484
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
485
  STableScanInfo*         pTableScanInfo = pOptr->info;
486 487 488 489 490 491
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

492 493 494 495 496
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  taosMemoryFree(pTableScanInfo->pResBlock);
  tsdbCleanupReadHandle(pTableScanInfo->dataReader);

wmmhello's avatar
wmmhello 已提交
497 498 499 500 501 502 503 504
  taosArrayDestroy(pTableScanInfo->pGroupCols);
  for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pTableScanInfo->pGroupColVals);
  taosMemoryFree(pTableScanInfo->keyBuf);
  taosHashCleanup(pTableScanInfo->pGroupSet);
505 506 507 508 509
  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

510
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
wmmhello's avatar
wmmhello 已提交
511
                                           SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
512 513 514
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
wmmhello's avatar
wmmhello 已提交
515
    goto _error;
H
Haojun Liao 已提交
516 517
  }

518
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
519

520
  int32_t numOfCols = 0;
521 522
  SArray* pColList =
      extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
523

524 525
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
526
    goto _error;
527 528 529 530
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
531
    pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
532 533 534
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
535
  //  pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
536

537 538 539
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
  pInfo->sampleRatio = pTableScanNode->ratio;
540
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
541 542 543 544 545
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
  pInfo->dataReader = pDataReader;
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
546

547
  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
548
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
549 550 551 552 553
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->numOfExprs = numOfCols;
  pOperator->pTaskInfo = pTaskInfo;
554

wmmhello's avatar
wmmhello 已提交
555 556 557 558 559 560 561 562 563 564 565 566
  // for table group
  pInfo->pGroupCols = groupKyes;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

567 568
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
569 570 571

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
D
dapan1121 已提交
572

H
Haojun Liao 已提交
573
  return pOperator;
wmmhello's avatar
wmmhello 已提交
574 575 576 577 578 579

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
580 581
}

582
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
583
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
584
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
585

L
Liu Jicong 已提交
586 587
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
588

L
Liu Jicong 已提交
589
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
590
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
L
Liu Jicong 已提交
591 592 593 594
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
595

596
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
597 598 599
  return pOperator;
}

600
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
601 602 603 604 605 606 607 608 609
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  STableScanInfo* pTableScanInfo = pOperator->info;

  STableBlockDistInfo tableBlockDist = {0};
  tableBlockDist.numOfTables = 1;  // TODO set the correct number of tables

S
Shengliang Guan 已提交
610 611
  int32_t numRowSteps = TSDB_DEFAULT_MAXROWS_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
  if (TSDB_DEFAULT_MAXROWS_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
H
Haojun Liao 已提交
612 613 614
    ++numRowSteps;
  }

615
  tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
H
Haojun Liao 已提交
616 617 618 619 620 621
  taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);

  tableBlockDist.maxRows = INT_MIN;
  tableBlockDist.minRows = INT_MAX;

  tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
622
  tableBlockDist.numOfRowsInMemTable = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
H
Haojun Liao 已提交
623

624
  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
625
  pBlock->info.rows = 1;
H
Haojun Liao 已提交
626 627
  pBlock->info.numOfCols = 1;

628 629
  //  SBufferWriter bw = tbufInitWriter(NULL, false);
  //  blockDistInfoToBinary(&tableBlockDist, &bw);
H
Haojun Liao 已提交
630 631
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);

632 633 634 635 636 637
  //  int32_t len = (int32_t) tbufTell(&bw);
  //  pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
  //  *(int32_t*) pColInfo->pData = len;
  //  memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
  //
  //  tbufCloseWriter(&bw);
H
Haojun Liao 已提交
638

639 640
  //  SArray* g = GET_TABLEGROUP(pOperator->, 0);
  //  pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
H
Haojun Liao 已提交
641 642 643 644 645 646

  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
647 648
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
649 650 651 652 653
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

654 655
  pInfo->dataReader = dataReader;
  //  pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
H
Haojun Liao 已提交
656 657

  SColumnInfoData infoData = {0};
658
  infoData.info.type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
659 660
  infoData.info.bytes = 1024;
  infoData.info.colId = 0;
661
  //  taosArrayPush(pInfo->block.pDataBlock, &infoData);
H
Haojun Liao 已提交
662

663
  pOperator->name = "DataBlockInfoScanOperator";
H
Haojun Liao 已提交
664
  //  pOperator->operatorType = OP_TableBlockInfoScan;
665
  pOperator->blocking = false;
666
  pOperator->status = OP_NOT_OPENED;
667 668
  pOperator->fpSet._openFn = operatorDummyOpenFn;
  pOperator->fpSet.getNextFn = doBlockInfoScan;
H
Haojun Liao 已提交
669

670 671
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
672 673 674

  return pOperator;

675
_error:
H
Haojun Liao 已提交
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

692
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; }
5
54liuyao 已提交
693

5
54liuyao 已提交
694 695 696 697
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pSDB = pInfo->pUpdateRes;
  if (pInfo->updateResIndex < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
698 699
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo   dumyInfo;
5
54liuyao 已提交
700
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
701 702 703
    STimeWindow win;
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
704 705 706 707
      int64_t              gap = pInfo->sessionSup.gap;
      int32_t              winIndex = 0;
      SResultWindowInfo*   pCurWin =
          getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex);
5
54liuyao 已提交
708
      win = pCurWin->win;
709 710
      pInfo->updateResIndex +=
          updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
5
54liuyao 已提交
711
    } else {
712 713 714 715
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
                                pInfo->interval.precision, NULL);
      pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
                                                        binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
716
    }
5
54liuyao 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
    STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
    pTableScanInfo->cond.twindow = win;
    tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
    pTableScanInfo->scanTimes = 0;
    return true;
  } else {
    return false;
  }
}

static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pResult = NULL;
  pResult = doTableScan(pInfo->pOperatorDumy);
  if (pResult == NULL) {
    if (prepareDataScan(pInfo)) {
      // scan next window data
      pResult = doTableScan(pInfo->pOperatorDumy);
    }
  }
  return pResult;
}

static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
5
54liuyao 已提交
740
  SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
741
  TSKEY*           ts = (TSKEY*)pColDataInfo->pData;
5
54liuyao 已提交
742 743
  for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
L
Liu Jicong 已提交
744
      taosArrayPush(pInfo->tsArray, ts + i);
5
54liuyao 已提交
745 746
    }
  }
5
54liuyao 已提交
747 748
  int32_t size = taosArrayGetSize(pInfo->tsArray);
  if (size > 0 && invertible) {
L
Liu Jicong 已提交
749 750 751 752 753
    // TODO(liuyao) get from tsdb
    //  SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
    //  p->info.type = STREAM_INVERT;
    //  taosArrayClear(pInfo->tsArray);
    //  return p;
754 755
    SSDataBlock*     pDataBlock = createOneDataBlock(pInfo->pRes, false);
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0);
5
54liuyao 已提交
756 757 758 759 760 761 762 763
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
    colInfoDataEnsureCapacity(pCol, 0, size);
    for (int32_t i = 0; i < size; i++) {
      TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
      colDataAppend(pCol, i, (char*)pTs, false);
    }
    pDataBlock->info.rows = size;
    pDataBlock->info.type = STREAM_REPROCESS;
764
    blockDataUpdateTsWindow(pDataBlock, 0);
5
54liuyao 已提交
765
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
766
    return pDataBlock;
5
54liuyao 已提交
767 768 769 770
  }
  return NULL;
}

5
54liuyao 已提交
771 772 773 774 775 776 777
void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) {
  int64_t* pKey = (int64_t*)pSup->pKeyBuf;
  pKey[0] = groupId;
  pKey[1] = childId;
  pKey[2] = ts;
}

778 779
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex,
                               int64_t childId) {
5
54liuyao 已提交
780
  SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
781
  TSKEY*           tsCols = (int64_t*)pColDataInfo->pData;
5
54liuyao 已提交
782 783
  for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
    setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
784
    SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize);
5
54liuyao 已提交
785 786
    if (p1 == NULL) {
      SWindowPosition pos = {.pageId = pageId, .rowId = i};
787 788
      int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition));
      if (code != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
789 790 791 792 793 794 795 796 797 798
        return code;
      }
    } else {
      p1->pageId = pageId;
      p1->rowId = i;
    }
  }
  return TSDB_CODE_SUCCESS;
}

799
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) {
5
54liuyao 已提交
800 801 802
  int32_t start = 0;
  int32_t stop = 0;
  int32_t pageSize = getBufPageSize(pSup->pDataBuf);
803
  while (start < pDataBlock->info.rows) {
5
54liuyao 已提交
804 805 806 807 808 809
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
    SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (pDB == NULL) {
      return terrno;
    }
    int32_t pageId = -1;
810
    void*   pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
5
54liuyao 已提交
811 812 813 814
    if (pPage == NULL) {
      blockDataDestroy(pDB);
      return terrno;
    }
815
    int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
5
54liuyao 已提交
816 817 818 819 820 821 822
    assert(size <= pageSize);
    blockDataToBuf(pPage, pDB);
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pDataBuf, pPage);
    blockDataDestroy(pDB);
    start = stop + 1;
    int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
823
    if (code != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
824 825 826 827 828 829 830 831 832 833 834 835
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pBlock = pInfo->pUpdateRes;
  if (pInfo->updateResIndex < pBlock->info.rows) {
    blockDataCleanup(pInfo->pRes);
    SCatchSupporter* pCSup = &pInfo->childAggSup;
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
836 837
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    int32_t          size = taosArrayGetSize(pInfo->childIds);
5
54liuyao 已提交
838
    for (int32_t i = 0; i < size; i++) {
839 840 841 842 843
      int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i);
      setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]);
      SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize);
      void*            buf = getBufPage(pCSup->pDataBuf, pos->pageId);
      SSDataBlock*     pDB = createOneDataBlock(pInfo->pRes, false);
5
54liuyao 已提交
844 845
      blockDataFromBuf(pDB, buf);
      SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
846
      blockDataMerge(pInfo->pRes, pSub);
5
54liuyao 已提交
847 848 849 850 851 852 853 854 855
      blockDataDestroy(pDB);
      blockDataDestroy(pSub);
    }
    pInfo->updateResIndex++;
    return pInfo->pRes;
  }
  return NULL;
}

856
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
857 858 859
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
860
  int32_t               rows = 0;
H
Haojun Liao 已提交
861

862
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
863
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
864 865 866
    return NULL;
  }

5
54liuyao 已提交
867
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
H
Haojun Liao 已提交
868 869 870
  if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
    if (pInfo->validBlockIndex >= total) {
      doClearBufferedBlocks(pInfo);
871
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
872 873 874 875
      return NULL;
    }

    int32_t current = pInfo->validBlockIndex++;
876
    return taosArrayGetP(pInfo->pBlockLists, current);
H
Haojun Liao 已提交
877
  } else {
5
54liuyao 已提交
878 879 880 881 882 883 884
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      blockDataCleanup(pInfo->pRes);
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
885
      prepareDataScan(pInfo);
5
54liuyao 已提交
886 887 888 889 890 891 892 893
      return pInfo->pUpdateRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
      SSDataBlock* pSDB = doDataScan(pInfo);
      if (pSDB == NULL) {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } else {
        return pSDB;
      }
5
54liuyao 已提交
894
    }
5
54liuyao 已提交
895

H
Haojun Liao 已提交
896 897 898
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

899
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
900
      SArray*  pCols = NULL;
901 902 903 904 905
      uint64_t groupId = 0;
      uint64_t uid = 0;
      int32_t  numOfRows = 0;
      int16_t  outputCol = 0;

906
      int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol);
H
Haojun Liao 已提交
907

908 909 910
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
911 912
      }

913 914
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
915
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
916
      pInfo->pRes->info.type = STREAM_NORMAL;
H
Haojun Liao 已提交
917 918 919

      int32_t numOfCols = pInfo->pRes->info.numOfCols;
      for (int32_t i = 0; i < numOfCols; ++i) {
920
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
921 922 923 924
        if (!pColMatchInfo->output) {
          continue;
        }

925
        bool colExists = false;
926
        for (int32_t j = 0; j < taosArrayGetSize(pCols); ++j) {
927 928 929 930 931 932 933 934 935 936 937 938 939 940
          SColumnInfoData* pResCol = taosArrayGet(pCols, j);
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colInfoDataEnsureCapacity(pDst, 0, pBlockInfo->rows);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
941 942 943 944
      }

      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
945
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
946 947 948
        pTaskInfo->code = terrno;
        return NULL;
      }
5
54liuyao 已提交
949
      rows = pBlockInfo->rows;
950
      doFilter(pInfo->pCondition, pInfo->pRes, NULL);
951
      blockDataUpdateTsWindow(pInfo->pRes, 0);
H
Haojun Liao 已提交
952 953 954 955 956 957 958 959

      break;
    }

    // record the scan action.
    pInfo->numOfExec++;
    pInfo->numOfRows += pBlockInfo->rows;

5
54liuyao 已提交
960
    if (rows == 0) {
961
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
962
    } else if (pInfo->pUpdateInfo) {
963
      SSDataBlock* upRes = getUpdateDataBlock(pInfo, true);  // TODO(liuyao) get invertible from plan
5
54liuyao 已提交
964
      if (upRes) {
5
54liuyao 已提交
965
        pInfo->pUpdateRes = upRes;
966
        if (upRes->info.type == STREAM_REPROCESS) {
5
54liuyao 已提交
967 968
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
969
        } else if (upRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
970 971 972
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return upRes;
        }
5
54liuyao 已提交
973
      }
974 975
    }

5
54liuyao 已提交
976
    return (rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
977 978 979
  }
}

980 981 982 983
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
                                            uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
                                            SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
                                            SOperatorInfo* pOperatorDumy) {
H
Haojun Liao 已提交
984 985 986 987
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
988
    goto _error;
H
Haojun Liao 已提交
989 990
  }

5
54liuyao 已提交
991 992
  STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info;

H
Haojun Liao 已提交
993 994 995
  int32_t numOfOutput = taosArrayGetSize(pColList);

  SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
996
  for (int32_t i = 0; i < numOfOutput; ++i) {
997
    SColMatchInfo* id = taosArrayGet(pColList, i);
998
    int16_t        colId = id->colId;
999
    taosArrayPush(pColIds, &colId);
H
Haojun Liao 已提交
1000 1001 1002 1003 1004 1005 1006 1007
  }

  pInfo->pColMatchInfo = pColList;

  // set the extract column id to streamHandle
  tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
  int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
  if (code != 0) {
1008
    goto _error;
H
Haojun Liao 已提交
1009 1010 1011 1012
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
1013 1014
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1015 1016
  }

5
54liuyao 已提交
1017 1018
  pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
  if (pInfo->tsArray == NULL) {
1019
    goto _error;
5
54liuyao 已提交
1020 1021
  }

1022
  pInfo->primaryTsIndex = 0;  // TODO(liuyao) get it from physical plan
1023
  if (pSTInfo->interval.interval > 0) {
1024
    pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000);  // TODO(liuyao) get watermark from physical plan
1025 1026
  } else {
    pInfo->pUpdateInfo = NULL;
5
54liuyao 已提交
1027 1028
  }

1029 1030
  pInfo->readHandle = *pHandle;
  pInfo->tableUid = uid;
1031
  pInfo->streamBlockReader = streamReadHandle;
1032 1033 1034 1035 1036 1037 1038
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pCondition;
  pInfo->pDataReader = pDataReader;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->pOperatorDumy = pOperatorDumy;
  pInfo->interval = pSTInfo->interval;
  pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
L
Liu Jicong 已提交
1039

5
54liuyao 已提交
1040
  initCacheSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval",
1041
                     "/tmp/");  // TODO(liuyao) get row size from phy plan
5
54liuyao 已提交
1042

1043
  pOperator->name = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1044
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1045 1046 1047
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
1048
  pOperator->numOfExprs = pResBlock->info.numOfCols;
1049
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1050

L
Liu Jicong 已提交
1051 1052
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1053

H
Haojun Liao 已提交
1054
  return pOperator;
1055

L
Liu Jicong 已提交
1056
_error:
1057 1058 1059
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1060 1061 1062 1063 1064 1065 1066
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1067
  const char* name = tNameGetTableName(&pInfo->name);
1068
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1069
    metaCloseTbCursor(pInfo->pCur);
1070
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1071
  }
H
Haojun Liao 已提交
1072 1073

  taosArrayDestroy(pInfo->scanCols);
H
Haojun Liao 已提交
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
}

EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
      if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1114
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1115 1116 1117 1118 1119 1120 1121
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1122
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1123 1124 1125
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1126
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1127 1128
}

1129
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1130 1131 1132 1133 1134 1135 1136
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1137 1138 1139
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1140 1141 1142 1143 1144
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1145
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1146 1147 1148 1149 1150 1151 1152 1153
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

  SFilterInfo* filter = NULL;
1154 1155

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1156 1157 1158 1159 1160

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1161
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1162
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1163

1164
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1165 1166 1167 1168 1169 1170 1171 1172
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181
    if (keep) {
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows);
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1182

1183 1184 1185 1186 1187 1188
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1189
        numOfRow += 1;
H
Haojun Liao 已提交
1190
      }
D
dapan1121 已提交
1191 1192
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1202 1203
static SSDataBlock* buildSysTableMetaBlock() {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1204

L
Liu Jicong 已提交
1205 1206
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1207 1208 1209
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1210 1211
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1212 1213 1214 1215
      index = i;
      break;
    }
  }
1216 1217 1218

  pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));

L
Liu Jicong 已提交
1219
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1220 1221 1222 1223 1224 1225 1226 1227 1228
    SColumnInfoData colInfoData = {0};
    colInfoData.info.colId = i + 1;
    colInfoData.info.type = pMeta[index].schema[i].type;
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
    taosArrayPush(pBlock->pDataBlock, &colInfoData);
  }

  pBlock->info.numOfCols = pMeta[index].colNum;
  pBlock->info.hasVarCol = true;
1229 1230 1231 1232

  return pBlock;
}

1233
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1234 1235 1236 1237 1238
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1239 1240
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1241 1242 1243 1244
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1245 1246
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
1247
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1248

1249 1250
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1251

1252
      doSetOperatorCompleted(pOperator);
1253 1254 1255 1256 1257
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1258

1259
      blockDataCleanup(pInfo->pRes);
1260

1261
      int32_t numOfRows = 0;
1262

1263 1264 1265
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1266

1267 1268 1269
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1270

1271 1272
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1273

1274
      SSDataBlock* p = buildSysTableMetaBlock();
1275
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1276

1277
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1278 1279 1280

      int32_t ret = 0;
      while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
1281
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1282

1283 1284 1285
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1286

1287 1288 1289
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1290

1291 1292 1293
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1294

1295 1296 1297
        // table comment
        // todo: set the correct comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
1298 1299
        colDataAppendNULL(pColInfoData, numOfRows);

1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
        char    str[256] = {0};
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1314
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337

          // super table name
          STR_TO_VARSTR(str, mr.me.name);
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppend(pColInfoData, numOfRows, str, false);
          metaReaderClear(&mr);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "CHILD_TABLE");
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1338
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "NORMAL_TABLE");
        }
1354

1355 1356
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
        colDataAppend(pColInfoData, numOfRows, str, false);
1357

1358
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1359 1360
          break;
        }
H
Haojun Liao 已提交
1361 1362
      }

1363 1364 1365 1366 1367 1368 1369
      // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
      if (ret != 0) {
        metaCloseTbCursor(pInfo->pCur);
        pInfo->pCur = NULL;
        doSetOperatorCompleted(pOperator);
      }

1370 1371
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1372

1373 1374
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1375

1376 1377 1378
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1379 1380 1381 1382 1383
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1384 1385 1386
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
H
Haojun Liao 已提交
1387

1388 1389 1390 1391 1392
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1393

1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1405

1406 1407 1408 1409
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
      pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
1410
      pMsgSendInfo->fp = loadSysTableCallback;
H
Haojun Liao 已提交
1411

1412
      int64_t transporterId = 0;
1413 1414
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1415
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1416

1417 1418 1419 1420 1421
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1422

1423 1424
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1425

1426 1427 1428 1429
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo),
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1430

1431 1432 1433 1434
        if (pRsp->numOfRows == 0) {
          return NULL;
        }
      }
H
Haojun Liao 已提交
1435

1436 1437
      SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
      setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
1438
                                pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1439

1440 1441 1442 1443 1444
      // todo log the filter info
      doFilterResult(pInfo);
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
      }
1445
    }
H
Haojun Liao 已提交
1446 1447 1448
  }
}

1449
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1450
  SSDataBlock* p = buildSysTableMetaBlock();
1451
  blockDataEnsureCapacity(p, capacity);
1452

L
Liu Jicong 已提交
1453
  size_t               size = 0;
1454 1455 1456 1457 1458 1459 1460 1461
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

L
Liu Jicong 已提交
1462 1463
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
  //  blockDataDestroy(p);  todo handle memory leak
1464 1465 1466 1467 1468

  pInfo->pRes->info.rows = p->info.rows;
  return p->info.rows;
}

L
Liu Jicong 已提交
1469 1470 1471
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1472 1473
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1474
  for (int32_t i = 0; i < size; ++i) {
1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1495
    for (int32_t j = 4; j <= 8; ++j) {
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1511
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName,
H
Haojun Liao 已提交
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
                                              SNode* pCondition, SEpSet epset, SArray* colList,
                                              SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    taosMemoryFreeClear(pInfo);
    taosMemoryFreeClear(pOperator);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
1523
  pInfo->accountId = accountId;
H
Haojun Liao 已提交
1524
  pInfo->showRewrite = showRewrite;
L
Liu Jicong 已提交
1525 1526 1527
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pCondition;
  pInfo->scanCols = colList;
1528 1529

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1530 1531

  tNameAssign(&pInfo->name, pName);
1532 1533
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1534
    pInfo->readHandle = *(SReadHandle*)readHandle;
1535
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1536 1537 1538
  } else {
    tsem_init(&pInfo->ready, 0, 0);
    pInfo->epSet = epset;
1539
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1540 1541
  }

L
Liu Jicong 已提交
1542
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1543
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
L
Liu Jicong 已提交
1544 1545 1546 1547 1548 1549
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->numOfExprs = pResBlock->info.numOfCols;
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
1550
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1551 1552 1553

  return pOperator;
}
H
Haojun Liao 已提交
1554

1555
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1556 1557 1558 1559
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1560 1561 1562
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

  int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

    SExprInfo* pExprInfo = &pOperator->pExpr[0];
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1635
#endif
H
Haojun Liao 已提交
1636

1637 1638 1639
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->pExpr[0];
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1640

wmmhello's avatar
wmmhello 已提交
1641 1642
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
1643 1644 1645 1646
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1647 1648 1649
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1650
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1651

wmmhello's avatar
wmmhello 已提交
1652 1653
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
1654
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1655

1656 1657 1658 1659 1660 1661 1662
    for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
1663
      } else {  // it is a tag value
1664 1665 1666 1667 1668 1669 1670
        if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
          const uint8_t* tmp = mr.me.ctbEntry.pTags;
          // TODO opt perf by realloc memory
          char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
          if (data == NULL) {
            qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1);
            longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
wmmhello's avatar
wmmhello 已提交
1671
          }
1672

wmmhello's avatar
wmmhello 已提交
1673
          *data = TSDB_DATA_TYPE_JSON;
1674
          memcpy(data + 1, tmp, kvRowLen(tmp));
wmmhello's avatar
wmmhello 已提交
1675 1676
          colDataAppend(pDst, count, data, false);
          taosMemoryFree(data);
1677
        } else {
wmmhello's avatar
wmmhello 已提交
1678 1679
          const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
          colDataAppend(pDst, count, p, (p == NULL));
wmmhello's avatar
wmmhello 已提交
1680
        }
H
Haojun Liao 已提交
1681 1682 1683
      }
    }

1684
    count += 1;
wmmhello's avatar
wmmhello 已提交
1685
    if (++pInfo->curPos >= size) {
1686
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1687 1688 1689
    }
  }

1690 1691
  metaReaderClear(&mr);

1692
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
1693
  if (pOperator->status == OP_EXEC_DONE) {
1694
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
1695 1696 1697
  }

  pRes->info.rows = count;
1698 1699
  pOperator->resultInfo.totalRows += count;

1700
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
1701 1702 1703 1704 1705 1706 1707
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

1708
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
L
Liu Jicong 已提交
1709
                                         SSDataBlock* pResBlock, SArray* pColMatchInfo,
wmmhello's avatar
wmmhello 已提交
1710
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
1711
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
1712 1713 1714 1715 1716
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

wmmhello's avatar
wmmhello 已提交
1717
  pInfo->pTableList = pTableListInfo;
L
Liu Jicong 已提交
1718 1719 1720 1721 1722
  pInfo->pColMatchInfo = pColMatchInfo;
  pInfo->pRes = pResBlock;
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
  pOperator->name = "TagScanOperator";
1723
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
L
Liu Jicong 已提交
1724 1725 1726 1727 1728 1729
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pExpr = pExpr;
  pOperator->numOfExprs = numOfOutput;
  pOperator->pTaskInfo = pTaskInfo;
1730

1731 1732 1733
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

1734 1735
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
1736 1737

  return pOperator;
1738

1739
_error:
H
Haojun Liao 已提交
1740 1741 1742 1743 1744
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}