scanoperator.c 58.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "function.h"
17
#include "filter.h"
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19 20
#include "os.h"
#include "querynodes.h"
21
#include "systable.h"
22
#include "tglobal.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
34
#include "vnode.h"
H
Haojun Liao 已提交
35 36

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
37
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
38

5
54liuyao 已提交
39 40 41 42 43
typedef struct SWindowPosition {
  int32_t pageId;
  int32_t rowId;
} SWindowPosition;

44
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
45 46
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
47

48
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

77 78 79 80 81 82 83 84 85
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
86
  // convert key to second
87 88 89 90 91 92 93
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
94
  time_t    t = (time_t)key;
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  tw->ekey -= 1;
}

110
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
111 112 113 114 115 116 117
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

118
  if (order == TSDB_ORDER_ASC) {
119
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
120 121 122 123 124 125
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

126 127
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
128 129 130 131 132 133 134 135 136 137
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

    while(1) {
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
156 157 158 159 160
  }

  return false;
}

161 162
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock);

L
Liu Jicong 已提交
163 164
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
165
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
166 167
  STableScanInfo* pInfo = pOperator->info;

168
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
169 170

  pCost->totalBlocks += 1;
171
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
172

173
  *status = pInfo->dataBlockLoadFlag;
174 175
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
176 177 178 179
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
180
  taosMemoryFreeClear(pBlock->pBlockAgg);
181 182

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
183 184
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
185 186 187
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
188 189
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
190
    pCost->skipBlocks += 1;
191 192 193 194 195 196 197

    // clear all data in pBlock that are set when handing the previous block
    for(int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

198 199 200 201
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
202
    bool             allColumnsHaveAgg = true;
203
    SColumnDataAgg** pColAgg = NULL;
204
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
205

206
    if (allColumnsHaveAgg == true) {
207 208 209
      int32_t numOfCols = pBlock->info.numOfCols;

      // todo create this buffer during creating operator
210 211 212 213
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

214 215 216 217 218 219 220
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
221

222
      return TSDB_CODE_SUCCESS;
223
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
224
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
225
    }
H
Haojun Liao 已提交
226
  }
227

228
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
229

H
Haojun Liao 已提交
230 231 232 233 234 235 236 237 238 239
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
240

H
Haojun Liao 已提交
241 242
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
243

H
Haojun Liao 已提交
244 245 246
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
247 248
  }

H
Haojun Liao 已提交
249
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
250 251 252 253 254 255

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
    addTagPseudoColumnData(pTableScanInfo, pBlock);
  }

256
  int64_t st = taosGetTimestampMs();
257 258
  doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);

259 260 261
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

262 263
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
264 265
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
266 267
  }

H
Haojun Liao 已提交
268 269 270
  return TSDB_CODE_SUCCESS;
}

271
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
272 273 274
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
275
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
276

277
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindow;
wafwerar's avatar
wafwerar 已提交
278
  TSWAP(pTWindow->skey, pTWindow->ekey);
279
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
280 281
}

282
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr == 0) {
    return;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

  for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
    colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
304
      setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
305
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319
      const char* p = NULL;
      if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
        const uint8_t *tmp = mr.me.ctbEntry.pTags;
        char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
        if(data == NULL){
          qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
          return;
        }
        *data = TSDB_DATA_TYPE_JSON;
        memcpy(data+1, tmp, kvRowLen(tmp));
        p = data;
      }else{
        p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
      }
320 321 322
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
        colDataAppend(pColInfoData, i, p, (p == NULL));
      }
wmmhello's avatar
wmmhello 已提交
323 324 325
      if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
        taosMemoryFree((void*)p);
      }
326 327 328 329 330 331
    }
  }

  metaReaderClear(&mr);
}

332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_BIGINT;
  infoData.info.bytes = sizeof(uint64_t);
  colInfoDataEnsureCapacity(&infoData, 0, 1);

  colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
  SScalarParam srcParam = {
      .numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

349
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
350
  STableScanInfo* pTableScanInfo = pOperator->info;
L
Liu Jicong 已提交
351
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
352

353 354
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
355 356 357 358 359 360 361
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);

362
    uint32_t status = 0;
H
Haojun Liao 已提交
363
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
H
Haojun Liao 已提交
364 365 366 367 368
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

369 370
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
H
Haojun Liao 已提交
371 372 373
      continue;
    }

374 375 376 377
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
H
Haojun Liao 已提交
378 379 380 381 382 383
    return pBlock;
  }

  return NULL;
}

384
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
385 386 387 388
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
389
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
390 391 392
    return NULL;
  }

393 394
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
395
    SSDataBlock* p = doTableScanImpl(pOperator);
H
Haojun Liao 已提交
396 397 398 399
    if (p != NULL) {
      return p;
    }

400
    pTableScanInfo->scanTimes += 1;
401

402
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
403 404 405 406
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;

      STimeWindow* pWin = &pTableScanInfo->cond.twindow;
407
      qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
408
             "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
409 410 411

      // do prepare for the next round table scan operation
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
H
Haojun Liao 已提交
412
    }
413
  }
H
Haojun Liao 已提交
414

415
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
416
  if (pTableScanInfo->scanTimes < total) {
417 418 419 420
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
    }
H
Haojun Liao 已提交
421

422 423 424
    STimeWindow* pWin = &pTableScanInfo->cond.twindow;
    qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
           GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
H
Haojun Liao 已提交
425

426
    while (pTableScanInfo->scanTimes < total) {
427
      SSDataBlock* p = doTableScanImpl(pOperator);
428 429 430
      if (p != NULL) {
        return p;
      }
H
Haojun Liao 已提交
431

432
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
433

434
      if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
435 436
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
437

438
        qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
L
Liu Jicong 已提交
439 440
               "-%" PRId64,
               GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
H
Haojun Liao 已提交
441

442 443 444
        // do prepare for the next round table scan operation
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
      }
H
Haojun Liao 已提交
445 446 447
    }
  }

448 449
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
450 451
}

452 453 454 455 456 457 458 459 460 461 462 463
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
  SInterval interval = {
      .interval = pTableScanNode->interval,
      .sliding = pTableScanNode->sliding,
      .intervalUnit = pTableScanNode->intervalUnit,
      .slidingUnit = pTableScanNode->slidingUnit,
      .offset = pTableScanNode->offset,
  };

  return interval;
}

464 465 466 467 468 469 470 471 472
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
  STableScanInfo* pTableScanInfo = pOptr->info;
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

473 474 475 476 477 478 479 480 481 482
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  taosMemoryFree(pTableScanInfo->pResBlock);
  tsdbCleanupReadHandle(pTableScanInfo->dataReader);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

483
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
484 485 486 487 488 489 490 491 492 493
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    taosMemoryFreeClear(pInfo);
    taosMemoryFreeClear(pOperator);

    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return NULL;
  }

494
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
495

496
  int32_t numOfCols = 0;
497
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
498

499 500 501 502 503 504 505 506 507 508 509
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
    return NULL;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
    pInfo->pPseudoCtx  = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
510
//  pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
511 512 513 514 515 516 517 518 519 520 521 522

  pInfo->readHandle        = *readHandle;
  pInfo->interval          = extractIntervalInfo(pTableScanNode);
  pInfo->sampleRatio       = pTableScanNode->ratio;
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pResBlock         = createResDataBlock(pDescNode);
  pInfo->pFilterNode       = pTableScanNode->scan.node.pConditions;
  pInfo->dataReader        = pDataReader;
  pInfo->scanFlag          = MAIN_SCAN;
  pInfo->pColMatchInfo     = pColList;

  pOperator->name         = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
523
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
524 525 526 527 528
  pOperator->blocking     = false;
  pOperator->status       = OP_NOT_OPENED;
  pOperator->info         = pInfo;
  pOperator->numOfExprs   = numOfCols;
  pOperator->pTaskInfo    = pTaskInfo;
529

530
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo);
531 532 533

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
D
dapan1121 已提交
534

H
Haojun Liao 已提交
535 536 537
  return pOperator;
}

538
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
539
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
540
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
541

L
Liu Jicong 已提交
542 543
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
544

L
Liu Jicong 已提交
545
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
546
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
L
Liu Jicong 已提交
547 548 549 550
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
551

552
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
553 554 555
  return pOperator;
}

556
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
557 558 559 560 561 562 563 564 565
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  STableScanInfo* pTableScanInfo = pOperator->info;

  STableBlockDistInfo tableBlockDist = {0};
  tableBlockDist.numOfTables = 1;  // TODO set the correct number of tables

S
Shengliang Guan 已提交
566 567
  int32_t numRowSteps = TSDB_DEFAULT_MAXROWS_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
  if (TSDB_DEFAULT_MAXROWS_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
H
Haojun Liao 已提交
568 569 570
    ++numRowSteps;
  }

571
  tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
H
Haojun Liao 已提交
572 573 574 575 576 577
  taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);

  tableBlockDist.maxRows = INT_MIN;
  tableBlockDist.minRows = INT_MAX;

  tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
578
  tableBlockDist.numOfRowsInMemTable = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
H
Haojun Liao 已提交
579

580
  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
581
  pBlock->info.rows = 1;
H
Haojun Liao 已提交
582 583
  pBlock->info.numOfCols = 1;

584 585
  //  SBufferWriter bw = tbufInitWriter(NULL, false);
  //  blockDistInfoToBinary(&tableBlockDist, &bw);
H
Haojun Liao 已提交
586 587
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);

588 589 590 591 592 593
  //  int32_t len = (int32_t) tbufTell(&bw);
  //  pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
  //  *(int32_t*) pColInfo->pData = len;
  //  memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
  //
  //  tbufCloseWriter(&bw);
H
Haojun Liao 已提交
594

595 596
  //  SArray* g = GET_TABLEGROUP(pOperator->, 0);
  //  pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
H
Haojun Liao 已提交
597 598 599 600 601 602

  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
603 604
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
605 606 607 608 609
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

610 611
  pInfo->dataReader = dataReader;
  //  pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
H
Haojun Liao 已提交
612 613

  SColumnInfoData infoData = {0};
614
  infoData.info.type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
615 616
  infoData.info.bytes = 1024;
  infoData.info.colId = 0;
617
  //  taosArrayPush(pInfo->block.pDataBlock, &infoData);
H
Haojun Liao 已提交
618

619
  pOperator->name = "DataBlockInfoScanOperator";
H
Haojun Liao 已提交
620
  //  pOperator->operatorType = OP_TableBlockInfoScan;
621
  pOperator->blocking = false;
622
  pOperator->status = OP_NOT_OPENED;
623 624
  pOperator->fpSet._openFn = operatorDummyOpenFn;
  pOperator->fpSet.getNextFn = doBlockInfoScan;
H
Haojun Liao 已提交
625

626 627
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
628 629 630

  return pOperator;

631
_error:
H
Haojun Liao 已提交
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

5
54liuyao 已提交
648 649 650 651
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
  return pInfo->sessionSup.pStreamAggSup != NULL;
}

5
54liuyao 已提交
652 653 654 655 656 657 658
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pSDB = pInfo->pUpdateRes;
  if (pInfo->updateResIndex < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
    TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    STimeWindow win;
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
      int64_t gap = pInfo->sessionSup.gap;
      int32_t winIndex = 0;
      SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows,
           tsCols[pInfo->updateResIndex], gap, &winIndex);
      win = pCurWin->win;
      pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows,
          pInfo->updateResIndex, gap, NULL);
    } else {
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex],
          &pInfo->interval, pInfo->interval.precision, NULL);
      pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
        win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
    }
5
54liuyao 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
    STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
    pTableScanInfo->cond.twindow = win;
    tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
    pTableScanInfo->scanTimes = 0;
    return true;
  } else {
    return false;
  }
}

static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pResult = NULL;
  pResult = doTableScan(pInfo->pOperatorDumy);
  if (pResult == NULL) {
    if (prepareDataScan(pInfo)) {
      // scan next window data
      pResult = doTableScan(pInfo->pOperatorDumy);
    }
  }
  return pResult;
}

static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
5
54liuyao 已提交
698
  SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
699
  TSKEY*           ts = (TSKEY*)pColDataInfo->pData;
5
54liuyao 已提交
700 701
  for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
L
Liu Jicong 已提交
702
      taosArrayPush(pInfo->tsArray, ts + i);
5
54liuyao 已提交
703 704
    }
  }
5
54liuyao 已提交
705 706
  int32_t size = taosArrayGetSize(pInfo->tsArray);
  if (size > 0 && invertible) {
L
Liu Jicong 已提交
707 708 709 710 711
    // TODO(liuyao) get from tsdb
    //  SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
    //  p->info.type = STREAM_INVERT;
    //  taosArrayClear(pInfo->tsArray);
    //  return p;
5
54liuyao 已提交
712 713 714 715 716 717 718 719 720 721
    SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false);
    SColumnInfoData* pCol = (SColumnInfoData*) taosArrayGet(pDataBlock->pDataBlock, 0);
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
    colInfoDataEnsureCapacity(pCol, 0, size);
    for (int32_t i = 0; i < size; i++) {
      TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
      colDataAppend(pCol, i, (char*)pTs, false);
    }
    pDataBlock->info.rows = size;
    pDataBlock->info.type = STREAM_REPROCESS;
722
    blockDataUpdateTsWindow(pDataBlock, 0);
5
54liuyao 已提交
723
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
724
    return pDataBlock;
5
54liuyao 已提交
725 726 727 728
  }
  return NULL;
}

5
54liuyao 已提交
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) {
  int64_t* pKey = (int64_t*)pSup->pKeyBuf;
  pKey[0] = groupId;
  pKey[1] = childId;
  pKey[2] = ts;
}

static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
    int32_t pageId, int32_t tsIndex, int64_t childId) {
  SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
  TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
  for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
    setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
    SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable,
        pSup->pKeyBuf, pSup->keySize);
    if (p1 == NULL) {
      SWindowPosition pos = {.pageId = pageId, .rowId = i};
      int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos,
          sizeof(SWindowPosition));
      if (code != TSDB_CODE_SUCCESS ) {
        return code;
      }
    } else {
      p1->pageId = pageId;
      p1->rowId = i;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
    int32_t tsIndex, int64_t childId) {
  int32_t start = 0;
  int32_t stop = 0;
  int32_t pageSize = getBufPageSize(pSup->pDataBuf);
  while(start < pDataBlock->info.rows) {
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
    SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (pDB == NULL) {
      return terrno;
    }
    int32_t pageId = -1;
    void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
    if (pPage == NULL) {
      blockDataDestroy(pDB);
      return terrno;
    }
    int32_t size = blockDataGetSize(pDB) + sizeof(int32_t)  + pDB->info.numOfCols * sizeof(int32_t);
    assert(size <= pageSize);
    blockDataToBuf(pPage, pDB);
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pDataBuf, pPage);
    blockDataDestroy(pDB);
    start = stop + 1;
    int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
    if (code != TSDB_CODE_SUCCESS ) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pBlock = pInfo->pUpdateRes;
  if (pInfo->updateResIndex < pBlock->info.rows) {
    blockDataCleanup(pInfo->pRes);
    SCatchSupporter* pCSup = &pInfo->childAggSup;
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
    TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
    int32_t size = taosArrayGetSize(pInfo->childIds);
    for (int32_t i = 0; i < size; i++) {
      int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i);
      setSupKeyBuf(pCSup, pBlock->info.groupId, id,
          tsCols[pInfo->updateResIndex]);
      SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable,
          pCSup->pKeyBuf, pCSup->keySize);
      void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
      SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
      blockDataFromBuf(pDB, buf);
      SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
809
      blockDataMerge(pInfo->pRes, pSub);
5
54liuyao 已提交
810 811 812 813 814 815 816 817 818
      blockDataDestroy(pDB);
      blockDataDestroy(pSub);
    }
    pInfo->updateResIndex++;
    return pInfo->pRes;
  }
  return NULL;
}

819
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
820 821 822
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
823
  int32_t               rows = 0;
H
Haojun Liao 已提交
824

825
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
826
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
827 828 829
    return NULL;
  }

5
54liuyao 已提交
830
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
H
Haojun Liao 已提交
831
  if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
5
54liuyao 已提交
832 833 834 835 836 837 838 839 840
    if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      SSDataBlock* pDB = getDataFromCatch(pInfo);
      if (pDB != NULL) {
        return pDB;
      } else {
       pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      }
    }

H
Haojun Liao 已提交
841 842
    if (pInfo->validBlockIndex >= total) {
      doClearBufferedBlocks(pInfo);
843
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
844 845 846 847
      return NULL;
    }

    int32_t current = pInfo->validBlockIndex++;
5
54liuyao 已提交
848 849 850 851 852 853 854 855 856 857 858
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
    if (pBlock->info.type == STREAM_REPROCESS) {
      pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
    } else {
      int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0);
      if (code != TDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        longjmp(pTaskInfo->env, code);
      }
    }
    return pBlock;
H
Haojun Liao 已提交
859
  } else {
5
54liuyao 已提交
860 861 862 863 864 865 866
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      blockDataCleanup(pInfo->pRes);
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
867
      prepareDataScan(pInfo);
5
54liuyao 已提交
868 869 870 871 872 873 874 875
      return pInfo->pUpdateRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
      SSDataBlock* pSDB = doDataScan(pInfo);
      if (pSDB == NULL) {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } else {
        return pSDB;
      }
5
54liuyao 已提交
876
    }
5
54liuyao 已提交
877

H
Haojun Liao 已提交
878 879 880
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

881
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
882
      SArray*  pCols = NULL;
883 884 885 886 887
      uint64_t groupId = 0;
      uint64_t uid = 0;
      int32_t  numOfRows = 0;
      int16_t  outputCol = 0;

888
      int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol);
H
Haojun Liao 已提交
889

890 891 892
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
893 894
      }

895 896
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
897
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
898
      pInfo->pRes->info.type = STREAM_NORMAL;
H
Haojun Liao 已提交
899 900 901

      int32_t numOfCols = pInfo->pRes->info.numOfCols;
      for (int32_t i = 0; i < numOfCols; ++i) {
902
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
903 904 905 906
        if (!pColMatchInfo->output) {
          continue;
        }

907
        bool colExists = false;
908
        for (int32_t j = 0; j < taosArrayGetSize(pCols); ++j) {
909 910 911 912 913 914 915 916 917 918 919 920 921 922
          SColumnInfoData* pResCol = taosArrayGet(pCols, j);
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colInfoDataEnsureCapacity(pDst, 0, pBlockInfo->rows);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
923 924 925 926
      }

      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
927
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
928 929 930
        pTaskInfo->code = terrno;
        return NULL;
      }
5
54liuyao 已提交
931
      rows = pBlockInfo->rows;
932
      doFilter(pInfo->pCondition, pInfo->pRes, NULL);
933
      blockDataUpdateTsWindow(pInfo->pRes, 0);
H
Haojun Liao 已提交
934 935 936 937 938 939 940 941

      break;
    }

    // record the scan action.
    pInfo->numOfExec++;
    pInfo->numOfRows += pBlockInfo->rows;

5
54liuyao 已提交
942
    if (rows == 0) {
943
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
944
    } else if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
945
      SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
5
54liuyao 已提交
946
      if (upRes) {
5
54liuyao 已提交
947
        pInfo->pUpdateRes = upRes;
948
        if (upRes->info.type == STREAM_REPROCESS) {
5
54liuyao 已提交
949 950
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
951
        } else if (upRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
952 953 954
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return upRes;
        }
5
54liuyao 已提交
955
      }
956 957
    }

5
54liuyao 已提交
958
    return (rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
959 960 961
  }
}

962 963 964 965
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
                                            uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
                                            SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pCondition,
                                            SOperatorInfo* pOperatorDumy) {
H
Haojun Liao 已提交
966 967 968 969
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
970
    goto _error;
H
Haojun Liao 已提交
971 972
  }

5
54liuyao 已提交
973 974
  STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info;

H
Haojun Liao 已提交
975 976 977
  int32_t numOfOutput = taosArrayGetSize(pColList);

  SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
978
  for (int32_t i = 0; i < numOfOutput; ++i) {
979 980 981
    SColMatchInfo* id = taosArrayGet(pColList, i);
    int16_t colId = id->colId;
    taosArrayPush(pColIds, &colId);
H
Haojun Liao 已提交
982 983 984 985 986 987 988 989
  }

  pInfo->pColMatchInfo = pColList;

  // set the extract column id to streamHandle
  tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
  int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
  if (code != 0) {
990
    goto _error;
H
Haojun Liao 已提交
991 992 993 994
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
995 996
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
997 998
  }

5
54liuyao 已提交
999 1000
  pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
  if (pInfo->tsArray == NULL) {
1001
    goto _error;
5
54liuyao 已提交
1002 1003
  }

L
Liu Jicong 已提交
1004
  pInfo->primaryTsIndex = 0;                           // TODO(liuyao) get it from physical plan
1005 1006 1007 1008
  if (pSTInfo->interval.interval > 0) {
    pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
  } else {
    pInfo->pUpdateInfo = NULL;
5
54liuyao 已提交
1009 1010
  }

1011 1012 1013 1014 1015 1016 1017 1018 1019
  pInfo->readHandle     = *pHandle;
  pInfo->tableUid       = uid;
  pInfo->streamBlockReader = streamReadHandle;
  pInfo->pRes           = pResBlock;
  pInfo->pCondition     = pCondition;
  pInfo->pDataReader    = pDataReader;
  pInfo->scanMode       = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->pOperatorDumy  = pOperatorDumy;
  pInfo->interval       = pSTInfo->interval;
5
54liuyao 已提交
1020
  pInfo->sessionSup     = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
L
Liu Jicong 已提交
1021

5
54liuyao 已提交
1022
  initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
5
54liuyao 已提交
1023

1024
  pOperator->name       = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1025
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1026 1027 1028
  pOperator->blocking   = false;
  pOperator->status     = OP_NOT_OPENED;
  pOperator->info       = pInfo;
L
Liu Jicong 已提交
1029
  pOperator->numOfExprs = pResBlock->info.numOfCols;
1030
  pOperator->pTaskInfo  = pTaskInfo;
H
Haojun Liao 已提交
1031

L
Liu Jicong 已提交
1032 1033
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1034

H
Haojun Liao 已提交
1035
  return pOperator;
1036

L
Liu Jicong 已提交
1037
_error:
1038 1039 1040
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1041 1042 1043 1044 1045 1046 1047
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1048
  const char* name = tNameGetTableName(&pInfo->name);
1049
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1050
    metaCloseTbCursor(pInfo->pCur);
1051
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1052
  }
H
Haojun Liao 已提交
1053 1054

  taosArrayDestroy(pInfo->scanCols);
H
Haojun Liao 已提交
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
}

EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
      if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1095
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1096 1097 1098 1099 1100 1101 1102
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1103
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1104 1105 1106
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1107
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1108 1109
}

1110
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1111 1112 1113 1114 1115 1116 1117
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1118 1119 1120
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1121 1122 1123 1124 1125
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1126
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1127 1128 1129 1130 1131 1132 1133 1134
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

  SFilterInfo* filter = NULL;
1135 1136

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1137 1138 1139 1140 1141

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1142
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1143
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1144

1145
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1146 1147 1148 1149 1150 1151 1152 1153
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162
    if (keep) {
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows);
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1163

1164 1165 1166 1167 1168 1169
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1170
        numOfRow += 1;
H
Haojun Liao 已提交
1171
      }
D
dapan1121 已提交
1172 1173
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1174 1175 1176 1177 1178 1179 1180 1181 1182
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1183 1184
static SSDataBlock* buildSysTableMetaBlock() {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1185

L
Liu Jicong 已提交
1186 1187
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1188 1189 1190
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1191 1192
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1193 1194 1195 1196
      index = i;
      break;
    }
  }
1197 1198 1199

  pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));

L
Liu Jicong 已提交
1200
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1201 1202 1203 1204 1205 1206 1207 1208 1209
    SColumnInfoData colInfoData = {0};
    colInfoData.info.colId = i + 1;
    colInfoData.info.type = pMeta[index].schema[i].type;
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
    taosArrayPush(pBlock->pDataBlock, &colInfoData);
  }

  pBlock->info.numOfCols = pMeta[index].colNum;
  pBlock->info.hasVarCol = true;
1210 1211 1212 1213

  return pBlock;
}

1214
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1215 1216 1217 1218 1219
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1220 1221
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1222 1223 1224 1225 1226
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
      if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
      }
1227

1228
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1229

1230 1231
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1232

1233 1234 1235 1236 1237 1238
      pOperator->status = OP_EXEC_DONE;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1239

1240
      blockDataCleanup(pInfo->pRes);
1241

1242
      int32_t numOfRows = 0;
1243

1244 1245 1246
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1247

1248 1249 1250
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1251

1252 1253
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1254

1255
      SSDataBlock* p = buildSysTableMetaBlock();
1256
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1257

1258 1259 1260
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
      while (metaTbCursorNext(pInfo->pCur) == 0) {
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1261

1262 1263 1264
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1265

1266 1267 1268
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1269

1270 1271 1272
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1273

1274 1275 1276
        // table comment
        // todo: set the correct comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
1277 1278
        colDataAppendNULL(pColInfoData, numOfRows);

1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
        char    str[256] = {0};
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schema.nCols, false);

          // super table name
          STR_TO_VARSTR(str, mr.me.name);
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppend(pColInfoData, numOfRows, str, false);
          metaReaderClear(&mr);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "CHILD_TABLE");
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schema.nCols, false);

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "NORMAL_TABLE");
        }
1333

1334 1335
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
        colDataAppend(pColInfoData, numOfRows, str, false);
1336

1337
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1338 1339
          break;
        }
H
Haojun Liao 已提交
1340 1341
      }

1342 1343
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1344

1345 1346
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1347

1348 1349 1350
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1351 1352 1353 1354 1355
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1356 1357 1358
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
H
Haojun Liao 已提交
1359

1360 1361 1362 1363 1364
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1365

1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1377

1378 1379 1380 1381
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
      pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
1382
      pMsgSendInfo->fp = loadSysTableCallback;
H
Haojun Liao 已提交
1383

1384
      int64_t transporterId = 0;
1385 1386
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1387
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1388

1389 1390 1391 1392 1393
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1394

1395 1396
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1397

1398 1399 1400 1401
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo),
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1402

1403 1404 1405 1406
        if (pRsp->numOfRows == 0) {
          return NULL;
        }
      }
H
Haojun Liao 已提交
1407

1408 1409
      SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
      setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
1410
                                pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1411

1412 1413 1414 1415 1416
      // todo log the filter info
      doFilterResult(pInfo);
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
      }
1417
    }
H
Haojun Liao 已提交
1418 1419 1420
  }
}

1421
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1422
  SSDataBlock* p = buildSysTableMetaBlock();
1423
  blockDataEnsureCapacity(p, capacity);
1424

L
Liu Jicong 已提交
1425
  size_t               size = 0;
1426 1427 1428 1429 1430 1431 1432 1433
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

L
Liu Jicong 已提交
1434 1435
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
  //  blockDataDestroy(p);  todo handle memory leak
1436 1437 1438 1439 1440

  pInfo->pRes->info.rows = p->info.rows;
  return p->info.rows;
}

L
Liu Jicong 已提交
1441 1442 1443
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1444 1445
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1446
  for (int32_t i = 0; i < size; ++i) {
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1467
    for (int32_t j = 4; j <= 8; ++j) {
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1483
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName,
H
Haojun Liao 已提交
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
                                              SNode* pCondition, SEpSet epset, SArray* colList,
                                              SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    taosMemoryFreeClear(pInfo);
    taosMemoryFreeClear(pOperator);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
1495
  pInfo->accountId = accountId;
H
Haojun Liao 已提交
1496
  pInfo->showRewrite = showRewrite;
L
Liu Jicong 已提交
1497 1498 1499
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pCondition;
  pInfo->scanCols = colList;
1500 1501

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1502 1503

  tNameAssign(&pInfo->name, pName);
1504 1505
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1506
    pInfo->readHandle = *(SReadHandle*)readHandle;
1507
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1508 1509 1510
  } else {
    tsem_init(&pInfo->ready, 0, 0);
    pInfo->epSet = epset;
1511
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1512 1513
  }

L
Liu Jicong 已提交
1514
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1515
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
L
Liu Jicong 已提交
1516 1517 1518 1519 1520 1521
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->numOfExprs = pResBlock->info.numOfCols;
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
1522
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1523 1524 1525

  return pOperator;
}
H
Haojun Liao 已提交
1526

1527
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1528 1529 1530 1531
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1532 1533 1534
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

  int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

    SExprInfo* pExprInfo = &pOperator->pExpr[0];
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1607
#endif
H
Haojun Liao 已提交
1608

1609 1610 1611
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->pExpr[0];
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1612

H
Haojun Liao 已提交
1613 1614 1615 1616 1617
  if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) {
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1618
  SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
H
Haojun Liao 已提交
1619

1620 1621 1622
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1623
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1624

1625 1626
  while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
1627
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1628

1629 1630 1631 1632 1633 1634 1635
    for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
1636
      } else { // it is a tag value
1637 1638 1639 1640 1641 1642 1643
        if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
          const uint8_t* tmp = mr.me.ctbEntry.pTags;
          // TODO opt perf by realloc memory
          char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
          if (data == NULL) {
            qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1);
            longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
wmmhello's avatar
wmmhello 已提交
1644
          }
1645

wmmhello's avatar
wmmhello 已提交
1646
          *data = TSDB_DATA_TYPE_JSON;
1647
          memcpy(data + 1, tmp, kvRowLen(tmp));
wmmhello's avatar
wmmhello 已提交
1648 1649
          colDataAppend(pDst, count, data, false);
          taosMemoryFree(data);
1650
        } else {
wmmhello's avatar
wmmhello 已提交
1651 1652
          const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
          colDataAppend(pDst, count, p, (p == NULL));
wmmhello's avatar
wmmhello 已提交
1653
        }
H
Haojun Liao 已提交
1654 1655 1656
      }
    }

1657
    count += 1;
1658
    if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
1659
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1660 1661 1662
    }
  }

1663 1664
  metaReaderClear(&mr);

1665
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
1666
  if (pOperator->status == OP_EXEC_DONE) {
1667
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
1668 1669 1670
  }

  pRes->info.rows = count;
1671 1672
  pOperator->resultInfo.totalRows += count;

1673
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
1674 1675 1676 1677 1678 1679 1680
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

1681
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
L
Liu Jicong 已提交
1682 1683
                                         SSDataBlock* pResBlock, SArray* pColMatchInfo,
                                         STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
1684
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
1685 1686 1687 1688 1689
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
1690 1691 1692 1693 1694 1695
  pInfo->pTableGroups = pTableGroupInfo;
  pInfo->pColMatchInfo = pColMatchInfo;
  pInfo->pRes = pResBlock;
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
  pOperator->name = "TagScanOperator";
1696
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
L
Liu Jicong 已提交
1697 1698 1699 1700 1701 1702
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pExpr = pExpr;
  pOperator->numOfExprs = numOfOutput;
  pOperator->pTaskInfo = pTaskInfo;
1703

1704 1705 1706
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

1707 1708
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
1709 1710

  return pOperator;
1711

1712
_error:
H
Haojun Liao 已提交
1713 1714 1715 1716 1717
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}