scanoperator.c 58.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19 20
#include "os.h"
#include "querynodes.h"
21
#include "systable.h"
22
#include "tglobal.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
34
#include "vnode.h"
H
Haojun Liao 已提交
35

wmmhello's avatar
wmmhello 已提交
36 37
#include "executorInt.h"

H
Haojun Liao 已提交
38
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
39
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
40

5
54liuyao 已提交
41 42 43 44 45
typedef struct SWindowPosition {
  int32_t pageId;
  int32_t rowId;
} SWindowPosition;

46
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
47 48
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
49

50
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

79 80 81 82 83 84 85 86 87
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
88
  // convert key to second
89 90 91 92 93 94 95
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
96
  time_t    t = (time_t)key;
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pInterval->precision);

  tw->ekey -= 1;
}

112
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
113 114 115 116 117 118 119
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

120
  if (order == TSDB_ORDER_ASC) {
121
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
122 123 124 125 126 127
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

128 129
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
130 131 132 133 134 135 136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
140 141 142 143 144 145 146
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

147
    while (1) {
148 149 150 151 152 153 154 155 156 157
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
158 159 160 161 162
  }

  return false;
}

163
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
164

L
Liu Jicong 已提交
165 166
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
167
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
168 169
  STableScanInfo* pInfo = pOperator->info;

170
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
171 172

  pCost->totalBlocks += 1;
173
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
174

175
  *status = pInfo->dataBlockLoadFlag;
176 177
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
178 179 180 181
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
182
  taosMemoryFreeClear(pBlock->pBlockAgg);
183 184

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
185 186
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
187 188 189
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
190 191
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
192
    pCost->skipBlocks += 1;
193 194

    // clear all data in pBlock that are set when handing the previous block
195
    for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
196 197 198 199
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

200 201 202 203
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
204
    bool             allColumnsHaveAgg = true;
205
    SColumnDataAgg** pColAgg = NULL;
206
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
207

208
    if (allColumnsHaveAgg == true) {
209 210 211
      int32_t numOfCols = pBlock->info.numOfCols;

      // todo create this buffer during creating operator
212 213 214 215
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

216 217 218 219 220 221 222
      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
223

224
      return TSDB_CODE_SUCCESS;
225
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
226
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
227
    }
H
Haojun Liao 已提交
228
  }
229

230
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
231

H
Haojun Liao 已提交
232 233 234 235 236 237 238 239 240 241
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
242

H
Haojun Liao 已提交
243 244
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
245

H
Haojun Liao 已提交
246 247 248
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
249 250
  }

H
Haojun Liao 已提交
251
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
252 253 254

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
255
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock);
256 257
  }

258
  int64_t st = taosGetTimestampMs();
259 260
  doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);

261 262 263
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

264 265
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
266 267
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
268 269
  }

H
Haojun Liao 已提交
270 271 272
  return TSDB_CODE_SUCCESS;
}

273
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
274 275 276
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
277
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
278

279
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
280 281 282 283 284 285 286 287 288 289
  for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
    STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
    TSWAP(pTWindow->skey, pTWindow->ekey);
  }
  SQueryTableDataCond *pCond = &pTableScanInfo->cond;
  taosqsort(pCond->twindows,
            pCond->numOfTWindows,
            sizeof(STimeWindow),
            pCond,
            compareTimeWindow);
H
Haojun Liao 已提交
290 291
}

292
void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) {
293
  // currently only the tbname pseudo column
294
  if (numOfPseudoExpr == 0) {
295 296 297 298
    return;
  }

  SMetaReader mr = {0};
299
  metaReaderInit(&mr, pHandle->meta, 0);
300 301
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

302 303
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
304 305 306 307

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
308
    
309
    colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
D
dapan1121 已提交
310
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
311 312 313 314 315

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
316
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
317
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
318 319 320 321 322 323 324 325 326
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
      const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);

      char *data = NULL;
      if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
        data = tTagValToData((const STagVal *)p, false);
      }else {
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
327
      }
328

329
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
wmmhello's avatar
wmmhello 已提交
330
        colDataAppend(pColInfoData, i, data, (data == NULL));
331
      }
332 333
      if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
          IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
wmmhello's avatar
wmmhello 已提交
334
        taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
335
      }
336 337 338 339 340 341
    }
  }

  metaReaderClear(&mr);
}

342 343 344 345 346 347 348 349 350
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_BIGINT;
  infoData.info.bytes = sizeof(uint64_t);
  colInfoDataEnsureCapacity(&infoData, 0, 1);

351
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
352
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
353 354 355 356 357

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

358
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
359
  STableScanInfo* pTableScanInfo = pOperator->info;
L
Liu Jicong 已提交
360
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
361

362 363
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
364 365 366 367 368 369 370
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);

371
    uint32_t status = 0;
H
Haojun Liao 已提交
372
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
H
Haojun Liao 已提交
373 374 375 376 377
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

wmmhello's avatar
wmmhello 已提交
378
    recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
wmmhello's avatar
wmmhello 已提交
379 380 381 382 383 384 385 386 387 388 389
    int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);

    uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
    if (!groupId) {
      pBlock->info.groupId = *groupId;

    }else{
      pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
      taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
    }

390 391
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
H
Haojun Liao 已提交
392 393 394
      continue;
    }

395
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
396
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
397 398

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
H
Haojun Liao 已提交
399 400 401 402 403
    return pBlock;
  }
  return NULL;
}

404
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
405 406 407 408
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
409
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
410 411 412
    return NULL;
  }

413 414
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
415 416 417 418 419 420 421 422 423
    while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
      }
      pTableScanInfo->curTWinIdx += 1;
      if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
      }
H
Haojun Liao 已提交
424 425
    }

426
    pTableScanInfo->scanTimes += 1;
427

428
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
429 430
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
431 432 433 434 435
      qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
      for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
        STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
        qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
      }
436
      // do prepare for the next round table scan operation
437 438
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
H
Haojun Liao 已提交
439
    }
440
  }
H
Haojun Liao 已提交
441

442
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
443
  if (pTableScanInfo->scanTimes < total) {
444 445
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
446 447
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
448
    }
H
Haojun Liao 已提交
449

450 451 452 453 454
    qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
    for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
      STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
      qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
    }
455
    while (pTableScanInfo->scanTimes < total) {
456 457 458 459 460 461 462 463 464
      while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        SSDataBlock* p = doTableScanImpl(pOperator);
        if (p != NULL) {
          return p;
        }
        pTableScanInfo->curTWinIdx += 1;
        if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
          tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
        }
465
      }
H
Haojun Liao 已提交
466

467
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
468

469
      if (pTableScanInfo->scanTimes < total) {
470 471
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
472

473 474 475 476 477
        qDebug("%s start to repeat descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
        for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
          STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
          qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
        }
478
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
479
        pTableScanInfo->curTWinIdx = 0;
480
      }
H
Haojun Liao 已提交
481 482 483
    }
  }

484 485
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
486 487
}

488 489 490 491 492 493 494 495 496 497 498 499
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
  SInterval interval = {
      .interval = pTableScanNode->interval,
      .sliding = pTableScanNode->sliding,
      .intervalUnit = pTableScanNode->intervalUnit,
      .slidingUnit = pTableScanNode->slidingUnit,
      .offset = pTableScanNode->offset,
  };

  return interval;
}

500 501
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
502
  STableScanInfo*         pTableScanInfo = pOptr->info;
503 504 505 506 507 508
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

509 510 511 512 513
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  taosMemoryFree(pTableScanInfo->pResBlock);
  tsdbCleanupReadHandle(pTableScanInfo->dataReader);

wmmhello's avatar
wmmhello 已提交
514 515 516 517 518 519 520 521
  taosArrayDestroy(pTableScanInfo->pGroupCols);
  for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pTableScanInfo->pGroupColVals);
  taosMemoryFree(pTableScanInfo->keyBuf);
  taosHashCleanup(pTableScanInfo->pGroupSet);
522 523 524 525 526
  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

527
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
wmmhello's avatar
wmmhello 已提交
528
                                           SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
529 530 531
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
wmmhello's avatar
wmmhello 已提交
532
    goto _error;
H
Haojun Liao 已提交
533 534
  }

535
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
536

537
  int32_t numOfCols = 0;
538 539
  SArray* pColList =
      extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
540

541 542
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
543
    goto _error;
544 545 546 547
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
548
    pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
549 550 551
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
552
//    pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
553

554 555 556
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
  pInfo->sampleRatio = pTableScanNode->ratio;
557
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
558 559 560 561 562
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
  pInfo->dataReader = pDataReader;
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
563
  pInfo->curTWinIdx = 0;
564

565
  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
566
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
567 568 569 570 571
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->numOfExprs = numOfCols;
  pOperator->pTaskInfo = pTaskInfo;
572

wmmhello's avatar
wmmhello 已提交
573 574 575 576 577 578 579 580 581 582 583 584
  // for table group
  pInfo->pGroupCols = groupKyes;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

585 586
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
587 588 589

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
D
dapan1121 已提交
590

H
Haojun Liao 已提交
591
  return pOperator;
wmmhello's avatar
wmmhello 已提交
592 593 594 595 596 597

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
598 599
}

600
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
601
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
602
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
603

L
Liu Jicong 已提交
604 605
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
606

L
Liu Jicong 已提交
607
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
608
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
L
Liu Jicong 已提交
609 610 611 612
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
613

614
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
615 616 617
  return pOperator;
}

618
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
619 620 621 622 623 624 625 626 627
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  STableScanInfo* pTableScanInfo = pOperator->info;

  STableBlockDistInfo tableBlockDist = {0};
  tableBlockDist.numOfTables = 1;  // TODO set the correct number of tables

S
Shengliang Guan 已提交
628 629
  int32_t numRowSteps = TSDB_DEFAULT_MAXROWS_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
  if (TSDB_DEFAULT_MAXROWS_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
H
Haojun Liao 已提交
630 631 632
    ++numRowSteps;
  }

633
  tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
H
Haojun Liao 已提交
634 635 636 637 638 639
  taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);

  tableBlockDist.maxRows = INT_MIN;
  tableBlockDist.minRows = INT_MAX;

  tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
640
  tableBlockDist.numOfRowsInMemTable = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
H
Haojun Liao 已提交
641

642
  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
643
  pBlock->info.rows = 1;
H
Haojun Liao 已提交
644 645
  pBlock->info.numOfCols = 1;

646 647
  //  SBufferWriter bw = tbufInitWriter(NULL, false);
  //  blockDistInfoToBinary(&tableBlockDist, &bw);
H
Haojun Liao 已提交
648 649
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);

650 651 652 653 654 655
  //  int32_t len = (int32_t) tbufTell(&bw);
  //  pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
  //  *(int32_t*) pColInfo->pData = len;
  //  memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
  //
  //  tbufCloseWriter(&bw);
H
Haojun Liao 已提交
656

657 658
  //  SArray* g = GET_TABLEGROUP(pOperator->, 0);
  //  pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
H
Haojun Liao 已提交
659 660 661 662 663 664

  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
665 666
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
667 668 669 670 671
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

672 673
  pInfo->dataReader = dataReader;
  //  pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
H
Haojun Liao 已提交
674 675

  SColumnInfoData infoData = {0};
676
  infoData.info.type = TSDB_DATA_TYPE_BINARY;
H
Haojun Liao 已提交
677 678
  infoData.info.bytes = 1024;
  infoData.info.colId = 0;
679
  //  taosArrayPush(pInfo->block.pDataBlock, &infoData);
H
Haojun Liao 已提交
680

681
  pOperator->name = "DataBlockInfoScanOperator";
H
Haojun Liao 已提交
682
  //  pOperator->operatorType = OP_TableBlockInfoScan;
683
  pOperator->blocking = false;
684
  pOperator->status = OP_NOT_OPENED;
685 686
  pOperator->fpSet._openFn = operatorDummyOpenFn;
  pOperator->fpSet.getNextFn = doBlockInfoScan;
H
Haojun Liao 已提交
687

688 689
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
690 691 692

  return pOperator;

693
_error:
H
Haojun Liao 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

710
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; }
5
54liuyao 已提交
711

5
54liuyao 已提交
712 713 714 715
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pSDB = pInfo->pUpdateRes;
  if (pInfo->updateResIndex < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
716 717
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo   dumyInfo;
5
54liuyao 已提交
718
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
719 720 721
    STimeWindow win;
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
722 723 724 725
      int64_t              gap = pInfo->sessionSup.gap;
      int32_t              winIndex = 0;
      SResultWindowInfo*   pCurWin =
          getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex);
5
54liuyao 已提交
726
      win = pCurWin->win;
727 728
      pInfo->updateResIndex +=
          updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
5
54liuyao 已提交
729
    } else {
730 731 732 733
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
                                pInfo->interval.precision, NULL);
      pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
                                                        binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
734
    }
5
54liuyao 已提交
735
    STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
736
    pTableScanInfo->cond.twindows[0] = win;
737
    pTableScanInfo->curTWinIdx = 0;
738
    tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
5
54liuyao 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
    pTableScanInfo->scanTimes = 0;
    return true;
  } else {
    return false;
  }
}

static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
  SSDataBlock* pResult = NULL;
  pResult = doTableScan(pInfo->pOperatorDumy);
  if (pResult == NULL) {
    if (prepareDataScan(pInfo)) {
      // scan next window data
      pResult = doTableScan(pInfo->pOperatorDumy);
    }
  }
  return pResult;
}

static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible) {
5
54liuyao 已提交
759
  SColumnInfoData* pColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
760
  TSKEY*           ts = (TSKEY*)pColDataInfo->pData;
5
54liuyao 已提交
761 762
  for (int32_t i = 0; i < pInfo->pRes->info.rows; i++) {
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pInfo->pRes->info.uid, ts[i])) {
L
Liu Jicong 已提交
763
      taosArrayPush(pInfo->tsArray, ts + i);
5
54liuyao 已提交
764 765
    }
  }
5
54liuyao 已提交
766 767
  int32_t size = taosArrayGetSize(pInfo->tsArray);
  if (size > 0 && invertible) {
L
Liu Jicong 已提交
768 769 770 771 772
    // TODO(liuyao) get from tsdb
    //  SSDataBlock* p = createOneDataBlock(pInfo->pRes, true);
    //  p->info.type = STREAM_INVERT;
    //  taosArrayClear(pInfo->tsArray);
    //  return p;
773 774
    SSDataBlock*     pDataBlock = createOneDataBlock(pInfo->pRes, false);
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0);
5
54liuyao 已提交
775 776 777 778 779 780 781 782
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
    colInfoDataEnsureCapacity(pCol, 0, size);
    for (int32_t i = 0; i < size; i++) {
      TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
      colDataAppend(pCol, i, (char*)pTs, false);
    }
    pDataBlock->info.rows = size;
    pDataBlock->info.type = STREAM_REPROCESS;
783
    blockDataUpdateTsWindow(pDataBlock, 0);
5
54liuyao 已提交
784
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
785
    return pDataBlock;
5
54liuyao 已提交
786 787 788 789
  }
  return NULL;
}

790
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
791 792 793
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
794
  int32_t               rows = 0;
H
Haojun Liao 已提交
795

796
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
797
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
798 799 800
    return NULL;
  }

5
54liuyao 已提交
801
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
H
Haojun Liao 已提交
802 803 804
  if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
    if (pInfo->validBlockIndex >= total) {
      doClearBufferedBlocks(pInfo);
805
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
806 807 808 809
      return NULL;
    }

    int32_t current = pInfo->validBlockIndex++;
810
    return taosArrayGetP(pInfo->pBlockLists, current);
H
Haojun Liao 已提交
811
  } else {
5
54liuyao 已提交
812 813 814 815 816 817 818
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      blockDataCleanup(pInfo->pRes);
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
819
      prepareDataScan(pInfo);
5
54liuyao 已提交
820 821 822 823 824 825 826 827
      return pInfo->pUpdateRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
      SSDataBlock* pSDB = doDataScan(pInfo);
      if (pSDB == NULL) {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } else {
        return pSDB;
      }
5
54liuyao 已提交
828
    }
5
54liuyao 已提交
829

H
Haojun Liao 已提交
830 831 832
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

833
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
834
      SArray*  pCols = NULL;
835 836 837 838 839
      uint64_t groupId = 0;
      uint64_t uid = 0;
      int32_t  numOfRows = 0;
      int16_t  outputCol = 0;

840
      int32_t code = tqRetrieveDataBlock(&pCols, pInfo->streamBlockReader, &groupId, &uid, &numOfRows, &outputCol);
H
Haojun Liao 已提交
841

842 843 844
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
845 846
      }

847 848
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
849
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
850
      pInfo->pRes->info.type = STREAM_NORMAL;
H
Haojun Liao 已提交
851

852 853 854 855
      // for generating rollup SMA result, each time is an independent time serie.
      // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
      if (pInfo->assignBlockUid) {
        pInfo->pRes->info.groupId = uid;
856 857
      } else {
        pInfo->pRes->info.groupId = groupId;
858 859
      }

860
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
861
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
862 863 864 865
        if (!pColMatchInfo->output) {
          continue;
        }

866
        bool colExists = false;
867
        for (int32_t j = 0; j < taosArrayGetSize(pCols); ++j) {
868 869 870 871 872 873 874 875 876 877 878 879 880 881
          SColumnInfoData* pResCol = taosArrayGet(pCols, j);
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colInfoDataEnsureCapacity(pDst, 0, pBlockInfo->rows);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
882 883 884 885
      }

      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
886
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
887 888 889
        pTaskInfo->code = terrno;
        return NULL;
      }
890

5
54liuyao 已提交
891
      rows = pBlockInfo->rows;
892 893 894 895 896 897

      // currently only the tbname pseudo column
      if (pInfo->numOfPseudoExpr > 0) {
        addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
      }

898
      doFilter(pInfo->pCondition, pInfo->pRes, NULL);
899
      blockDataUpdateTsWindow(pInfo->pRes, 0);
H
Haojun Liao 已提交
900 901 902 903 904
      break;
    }

    // record the scan action.
    pInfo->numOfExec++;
905
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
906

5
54liuyao 已提交
907
    if (rows == 0) {
908
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
909
    } else if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
910
      SSDataBlock* upRes = getUpdateDataBlock(pInfo, true);
5
54liuyao 已提交
911
      if (upRes) {
5
54liuyao 已提交
912
        pInfo->pUpdateRes = upRes;
913
        if (upRes->info.type == STREAM_REPROCESS) {
5
54liuyao 已提交
914 915
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
916
        } else if (upRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
917 918 919
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return upRes;
        }
5
54liuyao 已提交
920
      }
921 922
    }

5
54liuyao 已提交
923
    return (rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
924 925 926
  }
}

5
54liuyao 已提交
927 928
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
    SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
5
54liuyao 已提交
929
    STimeWindowAggSupp* pTwSup) {
H
Haojun Liao 已提交
930 931 932 933
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
934
    goto _error;
H
Haojun Liao 已提交
935 936
  }

937 938 939
  SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;

  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
940
  SOperatorInfo*      pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
5
54liuyao 已提交
941

942
  STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
5
54liuyao 已提交
943

944 945
  int32_t numOfCols = 0;
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
H
Haojun Liao 已提交
946

947 948
  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
949
  for (int32_t i = 0; i < numOfOutput; ++i) {
950 951 952
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
953
    taosArrayPush(pColIds, &colId);
H
Haojun Liao 已提交
954 955 956
  }

  // set the extract column id to streamHandle
957 958
  tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
  int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList);
H
Haojun Liao 已提交
959
  if (code != 0) {
960
    goto _error;
H
Haojun Liao 已提交
961 962 963 964
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
965 966
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
967 968
  }

5
54liuyao 已提交
969 970
  pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
  if (pInfo->tsArray == NULL) {
971
    goto _error;
5
54liuyao 已提交
972 973
  }

5
54liuyao 已提交
974 975 976 977 978 979
  if (isSmaStream(pTableScanNode->triggerType)) {
    pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval,
        pTableScanNode->filesFactor);
  }
  pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId;
  if (pSTInfo->interval.interval > 0 && pDataReader) {
5
54liuyao 已提交
980
    pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
981 982
  } else {
    pInfo->pUpdateInfo = NULL;
5
54liuyao 已提交
983 984
  }

985 986 987 988
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }
5
54liuyao 已提交
989

990 991 992 993 994 995 996 997 998 999 1000 1001
  pInfo->readHandle        = *pHandle;
  pInfo->tableUid          = pScanPhyNode->uid;
  pInfo->streamBlockReader = pHandle->reader;
  pInfo->pRes              = createResDataBlock(pDescNode);
  pInfo->pCondition        = pScanPhyNode->node.pConditions;
  pInfo->pDataReader       = pDataReader;
  pInfo->scanMode          = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->pOperatorDumy     = pTableScanDummy;
  pInfo->interval          = pSTInfo->interval;
  pInfo->sessionSup        = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};

  pOperator->name         = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1002
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1003 1004 1005 1006 1007
  pOperator->blocking     = false;
  pOperator->status       = OP_NOT_OPENED;
  pOperator->info         = pInfo;
  pOperator->numOfExprs   = pInfo->pRes->info.numOfCols;
  pOperator->pTaskInfo    = pTaskInfo;
H
Haojun Liao 已提交
1008

5
54liuyao 已提交
1009 1010
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL,
      NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1011

H
Haojun Liao 已提交
1012
  return pOperator;
1013

L
Liu Jicong 已提交
1014
_error:
1015 1016 1017
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1018 1019 1020 1021 1022 1023 1024
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1025
  const char* name = tNameGetTableName(&pInfo->name);
1026
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1027
    metaCloseTbCursor(pInfo->pCur);
1028
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1029
  }
H
Haojun Liao 已提交
1030 1031

  taosArrayDestroy(pInfo->scanCols);
H
Haojun Liao 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
}

EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
      if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1072
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1073 1074 1075 1076 1077 1078 1079
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1080
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1081 1082 1083
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1084
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1085 1086
}

1087
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1088 1089 1090 1091 1092 1093 1094
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1095 1096 1097
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1098 1099 1100 1101 1102
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1103
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1104 1105 1106 1107 1108 1109 1110 1111
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

  SFilterInfo* filter = NULL;
1112 1113

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1114 1115 1116 1117 1118

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1119
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1120
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1121

1122
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1123 1124 1125 1126 1127 1128 1129 1130
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1131 1132 1133 1134 1135 1136 1137 1138 1139
    if (keep) {
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows);
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1140

1141 1142 1143 1144 1145 1146
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1147
        numOfRow += 1;
H
Haojun Liao 已提交
1148
      }
D
dapan1121 已提交
1149 1150
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1151 1152 1153 1154 1155 1156 1157 1158 1159
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1160 1161
static SSDataBlock* buildSysTableMetaBlock() {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
1162

L
Liu Jicong 已提交
1163 1164
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1165 1166 1167
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1168 1169
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1170 1171 1172 1173
      index = i;
      break;
    }
  }
1174 1175 1176

  pBlock->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData));

L
Liu Jicong 已提交
1177
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
1178 1179 1180 1181 1182 1183 1184 1185 1186
    SColumnInfoData colInfoData = {0};
    colInfoData.info.colId = i + 1;
    colInfoData.info.type = pMeta[index].schema[i].type;
    colInfoData.info.bytes = pMeta[index].schema[i].bytes;
    taosArrayPush(pBlock->pDataBlock, &colInfoData);
  }

  pBlock->info.numOfCols = pMeta[index].colNum;
  pBlock->info.hasVarCol = true;
1187 1188 1189 1190

  return pBlock;
}

1191
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1192 1193 1194 1195 1196
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1197 1198
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1199 1200 1201 1202
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1203 1204
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
1205
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1206

1207 1208
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1209

1210
      doSetOperatorCompleted(pOperator);
1211 1212 1213 1214 1215
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1216

1217
      blockDataCleanup(pInfo->pRes);
1218

1219
      int32_t numOfRows = 0;
1220

1221 1222 1223
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1224

1225 1226 1227
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1228

1229 1230
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1231

1232
      SSDataBlock* p = buildSysTableMetaBlock();
1233
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1234

1235
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1236 1237 1238

      int32_t ret = 0;
      while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
1239
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1240

1241 1242 1243
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1244

1245 1246 1247
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1248

1249 1250 1251
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1252

1253 1254 1255
        // table comment
        // todo: set the correct comment
        pColInfoData = taosArrayGet(p->pDataBlock, 8);
1256 1257
        colDataAppendNULL(pColInfoData, numOfRows);

1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
        char    str[256] = {0};
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1272
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295

          // super table name
          STR_TO_VARSTR(str, mr.me.name);
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppend(pColInfoData, numOfRows, str, false);
          metaReaderClear(&mr);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "CHILD_TABLE");
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1296
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

          STR_TO_VARSTR(str, "NORMAL_TABLE");
        }
1312

1313 1314
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
        colDataAppend(pColInfoData, numOfRows, str, false);
1315

1316
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1317 1318
          break;
        }
H
Haojun Liao 已提交
1319 1320
      }

1321 1322 1323 1324 1325 1326 1327
      // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
      if (ret != 0) {
        metaCloseTbCursor(pInfo->pCur);
        pInfo->pCur = NULL;
        doSetOperatorCompleted(pOperator);
      }

1328 1329
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1330

1331 1332
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1333

1334 1335 1336
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1337 1338 1339 1340 1341
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1342 1343 1344
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
H
Haojun Liao 已提交
1345

1346 1347 1348 1349 1350
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1351

1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1363

1364 1365 1366 1367
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
      pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
1368
      pMsgSendInfo->fp = loadSysTableCallback;
H
Haojun Liao 已提交
1369

1370
      int64_t transporterId = 0;
1371 1372
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1373
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1374

1375 1376 1377 1378 1379
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1380

1381 1382
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1383

1384 1385 1386 1387
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo),
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1388

1389 1390 1391 1392
        if (pRsp->numOfRows == 0) {
          return NULL;
        }
      }
H
Haojun Liao 已提交
1393

1394 1395
      SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
      setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
1396
                                pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1397

1398 1399 1400 1401 1402
      // todo log the filter info
      doFilterResult(pInfo);
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
      }
1403
    }
H
Haojun Liao 已提交
1404 1405 1406
  }
}

1407
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1408
  SSDataBlock* p = buildSysTableMetaBlock();
1409
  blockDataEnsureCapacity(p, capacity);
1410

L
Liu Jicong 已提交
1411
  size_t               size = 0;
1412 1413 1414 1415 1416 1417 1418 1419
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

L
Liu Jicong 已提交
1420 1421
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
  //  blockDataDestroy(p);  todo handle memory leak
1422 1423 1424 1425 1426

  pInfo->pRes->info.rows = p->info.rows;
  return p->info.rows;
}

L
Liu Jicong 已提交
1427 1428 1429
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1430 1431
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1432
  for (int32_t i = 0; i < size; ++i) {
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1453
    for (int32_t j = 4; j <= 8; ++j) {
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1469
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName,
H
Haojun Liao 已提交
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
                                              SNode* pCondition, SEpSet epset, SArray* colList,
                                              SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    taosMemoryFreeClear(pInfo);
    taosMemoryFreeClear(pOperator);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
1481
  pInfo->accountId = accountId;
H
Haojun Liao 已提交
1482
  pInfo->showRewrite = showRewrite;
L
Liu Jicong 已提交
1483 1484 1485
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pCondition;
  pInfo->scanCols = colList;
1486 1487

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1488 1489

  tNameAssign(&pInfo->name, pName);
1490 1491
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1492
    pInfo->readHandle = *(SReadHandle*)readHandle;
1493
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1494 1495 1496
  } else {
    tsem_init(&pInfo->ready, 0, 0);
    pInfo->epSet = epset;
1497
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1498 1499
  }

L
Liu Jicong 已提交
1500
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1501
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
L
Liu Jicong 已提交
1502 1503 1504 1505 1506 1507
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->numOfExprs = pResBlock->info.numOfCols;
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
1508
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1509 1510 1511

  return pOperator;
}
H
Haojun Liao 已提交
1512

1513
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1514 1515 1516 1517
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1518 1519 1520
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

  int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

    SExprInfo* pExprInfo = &pOperator->pExpr[0];
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1593
#endif
H
Haojun Liao 已提交
1594

1595 1596 1597
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->pExpr[0];
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1598

wmmhello's avatar
wmmhello 已提交
1599 1600
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
1601 1602 1603 1604
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1605 1606 1607
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1608
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1609

wmmhello's avatar
wmmhello 已提交
1610 1611
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
1612
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1613

1614 1615 1616 1617 1618 1619 1620
    for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
1621
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
        const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);

        char *data = NULL;
        if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
          data = tTagValToData((const STagVal *)p, false);
        }else {
          data = (char*)p;
        }
        colDataAppend(pDst, count, data, (data == NULL));
1633

wmmhello's avatar
wmmhello 已提交
1634 1635
        if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL
            && IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data != NULL){
wmmhello's avatar
wmmhello 已提交
1636
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
1637
        }
H
Haojun Liao 已提交
1638 1639 1640
      }
    }

1641
    count += 1;
wmmhello's avatar
wmmhello 已提交
1642
    if (++pInfo->curPos >= size) {
1643
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1644 1645 1646
    }
  }

1647 1648
  metaReaderClear(&mr);

1649
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
1650
  if (pOperator->status == OP_EXEC_DONE) {
1651
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
1652 1653 1654
  }

  pRes->info.rows = count;
1655 1656
  pOperator->resultInfo.totalRows += count;

1657
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
1658 1659 1660 1661 1662 1663 1664
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

1665
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
C
Cary Xu 已提交
1666 1667
                                         SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo,
                                         SExecTaskInfo* pTaskInfo) {
1668
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
1669 1670 1671 1672 1673
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

wmmhello's avatar
wmmhello 已提交
1674
  pInfo->pTableList = pTableListInfo;
L
Liu Jicong 已提交
1675 1676 1677 1678 1679
  pInfo->pColMatchInfo = pColMatchInfo;
  pInfo->pRes = pResBlock;
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
  pOperator->name = "TagScanOperator";
1680
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
L
Liu Jicong 已提交
1681 1682 1683 1684 1685 1686
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pExpr = pExpr;
  pOperator->numOfExprs = numOfOutput;
  pOperator->pTaskInfo = pTaskInfo;
1687

1688 1689 1690
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

1691 1692
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
1693 1694

  return pOperator;
1695

1696
_error:
H
Haojun Liao 已提交
1697 1698 1699 1700 1701
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}