scanoperator.c 88.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include <executorimpl.h>
17
#include <vnode.h>
18
#include "filter.h"
19
#include "function.h"
20
#include "functionMgt.h"
H
Haojun Liao 已提交
21 22
#include "os.h"
#include "querynodes.h"
23
#include "systable.h"
24
#include "tglobal.h"
H
Haojun Liao 已提交
25
#include "tname.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34 35

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
36
#include "vnode.h"
H
Haojun Liao 已提交
37

wmmhello's avatar
wmmhello 已提交
38 39
#include "executorInt.h"

H
Haojun Liao 已提交
40
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
41
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
42

43
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
L
Liu Jicong 已提交
44 45
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                                     const char* dbName);
46

47 48 49
static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                                   SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
50

51
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
52 53 54 55 56 57 58 59 60 61 62 63
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

64
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
#if 0
  int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
  for(int32_t i = 0; i < numOfGroups; ++i) {
    SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
    SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);

    size_t t = taosArrayGetSize(group);
    for (int32_t j = 0; j < t; ++j) {
      STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
      updateTableQueryInfoForReverseScan(pCheckInfo);

      // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
      // the start check timestamp of tsdbQueryHandle
//      STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
//      pTableKeyInfo->lastKey = pCheckInfo->lastKey;
//
//      assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
    }
  }
#endif
}

93 94 95 96 97 98 99 100 101
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
102
  // convert key to second
103 104 105 106 107 108 109
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
110
  time_t    t = (time_t)key;
111 112 113 114 115
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
116
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
117 118 119 120

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
121
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
122 123 124 125

  tw->ekey -= 1;
}

126
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
127 128 129 130 131 132 133
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

134
  if (order == TSDB_ORDER_ASC) {
135
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
136 137 138 139 140 141
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

142 143
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
144 145 146 147 148 149 150 151 152 153
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
154 155 156 157 158 159 160
    getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w);
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

161
    while (1) {
162 163 164 165 166 167 168 169 170 171
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
172 173 174 175 176
  }

  return false;
}

L
Liu Jicong 已提交
177 178
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                             uint32_t* status) {
179
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
180 181
  STableScanInfo* pInfo = pOperator->info;

182
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
183 184

  pCost->totalBlocks += 1;
185
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
186

187
  *status = pInfo->dataBlockLoadFlag;
188 189
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
190 191 192 193
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
194
  taosMemoryFreeClear(pBlock->pBlockAgg);
195 196

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
197 198
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
199 200 201
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
202 203
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
204
    pCost->skipBlocks += 1;
205 206

    // clear all data in pBlock that are set when handing the previous block
207
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
208 209 210 211
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

212 213 214 215
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

L
Liu Jicong 已提交
216
    bool             allColumnsHaveAgg = true;
217
    SColumnDataAgg** pColAgg = NULL;
218
    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
219

220
    if (allColumnsHaveAgg == true) {
221
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
222 223

      // todo create this buffer during creating operator
224 225 226 227
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

228
      for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
229 230 231 232 233 234
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }
H
Haojun Liao 已提交
235

236
      return TSDB_CODE_SUCCESS;
237
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
H
Haojun Liao 已提交
238
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
239
    }
H
Haojun Liao 已提交
240
  }
241

242
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
243

H
Haojun Liao 已提交
244 245 246 247 248 249 250 251 252 253
  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif
H
Haojun Liao 已提交
254

H
Haojun Liao 已提交
255 256
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
257

H
Haojun Liao 已提交
258 259 260
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
261 262
  }

263
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
264 265

  // currently only the tbname pseudo column
266
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
267
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;
268
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
269 270
  }

271
  int64_t st = taosGetTimestampMs();
272
  doFilter(pTableScanInfo->pFilterNode, pBlock);
273

274 275 276
  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

277 278
  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
279 280
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
281 282
  }

H
Haojun Liao 已提交
283 284 285
  return TSDB_CODE_SUCCESS;
}

286
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
287 288 289
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
290
  //  setupQueryRangeForReverseScan(pTableScanInfo);
H
Haojun Liao 已提交
291

292
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
293 294 295 296
  for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
    STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
    TSWAP(pTWindow->skey, pTWindow->ekey);
  }
297 298 299

  SQueryTableDataCond* pCond = &pTableScanInfo->cond;
  taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
H
Haojun Liao 已提交
300 301
}

302 303
void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
                            SSDataBlock* pBlock) {
304
  // currently only the tbname pseudo column
305
  if (numOfPseudoExpr == 0) {
306 307 308 309
    return;
  }

  SMetaReader mr = {0};
310
  metaReaderInit(&mr, pHandle->meta, 0);
311 312
  metaGetTableEntryByUid(&mr, pBlock->info.uid);

313 314
  for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
    SExprInfo* pExpr = &pPseudoExpr[j];
315 316 317 318

    int32_t dstSlotId = pExpr->base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
319

320
    colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows);
D
dapan1121 已提交
321
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
322 323 324 325 326

    int32_t functionId = pExpr->pExpr->_function.functionId;

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
327
      setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId);
328
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
329 330
      STagVal tagVal = {0};
      tagVal.cid = pExpr->base.pParam[0].pCol->colId;
331
      const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
332

333 334 335 336
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
337
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
338
      }
339

340
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
L
Liu Jicong 已提交
341 342
        colDataAppend(pColInfoData, i, data,
                      (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
343
      }
344

345 346
      if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
          IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
wmmhello's avatar
wmmhello 已提交
347
        taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
348
      }
349 350 351 352 353 354
    }
  }

  metaReaderClear(&mr);
}

355 356 357 358
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

359 360
  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1);
  colInfoDataEnsureCapacity(&infoData, 1);
361

362
  colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
363
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
364 365 366 367 368

  SScalarParam param = {.columnData = pColInfoData};
  fpSet.process(&srcParam, 1, &param);
}

369
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
370
  STableScanInfo* pTableScanInfo = pOperator->info;
371
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
372
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
373

374 375
  int64_t st = taosGetTimestampUs();

376
  while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
377 378
    if (isTaskKilled(pTaskInfo)) {
      longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
379
    }
H
Haojun Liao 已提交
380

381 382 383 384 385 386
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

387 388 389 390 391 392 393 394
    blockDataCleanup(pBlock);

    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.rows);
    pBlock->info = binfo;
H
Haojun Liao 已提交
395

396 397 398 399 400 401
    uint32_t status = 0;
    int32_t  code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }
402

403 404 405
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
406
    }
407

408
    uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
wmmhello's avatar
wmmhello 已提交
409 410 411 412
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

413 414 415 416
    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
417 418 419 420 421

    // todo refactor
    pTableScanInfo->scanStatus.uid = pBlock->info.uid;
    pTableScanInfo->scanStatus.t = pBlock->info.window.ekey;

422
    return pBlock;
H
Haojun Liao 已提交
423 424 425 426
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
427
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
428 429 430 431
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
432
  if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
433 434 435
    return NULL;
  }

436 437
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
438 439 440 441 442 443 444 445 446
    while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
      }
      pTableScanInfo->curTWinIdx += 1;
      if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
      }
H
Haojun Liao 已提交
447 448
    }

449
    pTableScanInfo->scanTimes += 1;
450

451
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
452 453
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
      pTableScanInfo->scanFlag = REPEAT_SCAN;
454 455 456
      qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
      for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
        STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
457
        qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
458
      }
459 460 461
      // do prepare for the next round table scan operation
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
H
Haojun Liao 已提交
462
    }
463
  }
H
Haojun Liao 已提交
464

465
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
466
  if (pTableScanInfo->scanTimes < total) {
467
    if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
468
      prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0);
469 470
      tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
      pTableScanInfo->curTWinIdx = 0;
471
    }
H
Haojun Liao 已提交
472

473 474 475
    qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
    for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
      STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
476
      qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
477
    }
478

479
    while (pTableScanInfo->scanTimes < total) {
480 481 482 483 484 485 486 487 488
      while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
        SSDataBlock* p = doTableScanImpl(pOperator);
        if (p != NULL) {
          return p;
        }
        pTableScanInfo->curTWinIdx += 1;
        if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
          tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
        }
489
      }
H
Haojun Liao 已提交
490

491
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
492

493
      if (pTableScanInfo->scanTimes < total) {
494 495
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
        pTableScanInfo->scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
496

497 498
        qDebug("%s start to repeat descending order scan data blocks due to query func required",
               GET_TASKID(pTaskInfo));
499 500
        for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
          STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
501
          qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
502
        }
503
        tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
504
        pTableScanInfo->curTWinIdx = 0;
505
      }
H
Haojun Liao 已提交
506 507 508
    }
  }

wmmhello's avatar
wmmhello 已提交
509 510 511 512 513 514 515
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

516
  if (pInfo->currentGroupId == -1) {
wmmhello's avatar
wmmhello 已提交
517
    pInfo->currentGroupId++;
wmmhello's avatar
wmmhello 已提交
518
    if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
519
      setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
520 521
      return NULL;
    }
522
    SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
wmmhello's avatar
wmmhello 已提交
523
    tsdbCleanupReadHandle(pInfo->dataReader);
524 525
    tsdbReaderT* pReader =
        tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
wmmhello's avatar
wmmhello 已提交
526 527 528 529
    pInfo->dataReader = pReader;
  }

  SSDataBlock* result = doTableScanGroup(pOperator);
530
  if (result) {
wmmhello's avatar
wmmhello 已提交
531 532 533 534 535
    return result;
  }

  pInfo->currentGroupId++;
  if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
wmmhello's avatar
wmmhello 已提交
536
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
wmmhello's avatar
wmmhello 已提交
537 538 539
    return NULL;
  }

540
  SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
wmmhello's avatar
wmmhello 已提交
541 542 543 544 545 546 547
  tsdbSetTableList(pInfo->dataReader, tableList);

  tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
  pInfo->curTWinIdx = 0;
  pInfo->scanTimes = 0;

  result = doTableScanGroup(pOperator);
548
  if (result) {
wmmhello's avatar
wmmhello 已提交
549 550 551
    return result;
  }

552 553
  setTaskStatus(pTaskInfo, TASK_COMPLETED);
  return NULL;
H
Haojun Liao 已提交
554 555
}

556 557
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
558
  STableScanInfo*         pTableScanInfo = pOptr->info;
559 560 561 562 563 564
  *pRecorder = pTableScanInfo->readRecorder;
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

565 566
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
567
  blockDataDestroy(pTableScanInfo->pResBlock);
568
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
H
Haojun Liao 已提交
569

570 571 572 573 574 575 576
  tsdbCleanupReadHandle(pTableScanInfo->dataReader);

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }
}

wmmhello's avatar
wmmhello 已提交
577 578
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId) {
H
Haojun Liao 已提交
579 580 581
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
582
    goto _error;
H
Haojun Liao 已提交
583 584
  }

L
Liu Jicong 已提交
585
  // taosSsleep(20);
D
dapan1121 已提交
586

587
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
588
  int32_t             numOfCols = 0;
589
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
L
Liu Jicong 已提交
590

591 592
  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
593
    goto _error;
594 595 596
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
597
    SExprSupp* pSup = &pInfo->pseudoSup;
598 599
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
600 601
  }

602
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
603
  //    pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
604

605 606 607 608
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
609

610
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
611 612 613 614 615
  pInfo->pResBlock = createResDataBlock(pDescNode);
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
  pInfo->curTWinIdx = 0;
wmmhello's avatar
wmmhello 已提交
616 617 618
  pInfo->queryId = queryId;
  pInfo->taskId = taskId;
  pInfo->currentGroupId = -1;
619 620

  pOperator->name = "TableScanOperator";  // for debug purpose
L
Liu Jicong 已提交
621
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
622 623 624
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
625
  pOperator->exprSupp.numOfExprs = numOfCols;
626
  pOperator->pTaskInfo = pTaskInfo;
627

628 629
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
                                         NULL, NULL, getTableScannerExecInfo);
630 631 632

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
633
  return pOperator;
634

635
_error:
636 637 638 639 640
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);

  pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
641 642
}

643
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
644
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
645
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
646

L
Liu Jicong 已提交
647 648
  pInfo->dataReader = pReadHandle;
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
649

650
  pOperator->name = "TableSeqScanOperator";
H
Haojun Liao 已提交
651
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
652 653 654 655
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
656

657
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
H
Haojun Liao 已提交
658 659 660
  return pOperator;
}

661 662
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
  int32_t rowLen = 0;
H
Haojun Liao 已提交
663

664
  SMetaReader mr = {0};
665 666
  metaReaderInit(&mr, pMeta, 0);
  metaGetTableEntryByUid(&mr, uid);
667 668
  if (mr.me.type == TSDB_SUPER_TABLE) {
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
669
    for (int32_t i = 0; i < numOfCols; ++i) {
670 671 672 673 674 675 676
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    uint64_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
    int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;

677
    for (int32_t i = 0; i < numOfCols; ++i) {
678 679 680 681
      rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
    }
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
    int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
682
    for (int32_t i = 0; i < numOfCols; ++i) {
683 684 685 686 687
      rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
    }
  }

  metaReaderClear(&mr);
688 689 690 691 692 693 694 695 696 697 698 699
  return rowLen;
}

static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SBlockDistInfo* pBlockScanInfo = pOperator->info;

  STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
  blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid);
700 701 702

  tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
  blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
H
Haojun Liao 已提交
703

704
  SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
H
Haojun Liao 已提交
705

706
  int32_t          slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
707
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
708

709
  int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
710
  char*   p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
711 712 713
  tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
  varDataSetLen(p, len);

714
  blockDataEnsureCapacity(pBlock, 1);
715 716
  colDataAppend(pColInfo, 0, p, false);
  taosMemoryFree(p);
H
Haojun Liao 已提交
717

718 719
  pBlock->info.rows = 1;

H
Haojun Liao 已提交
720 721 722 723
  pOperator->status = OP_EXEC_DONE;
  return pBlock;
}

724
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
725
  SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
726 727 728
  blockDataDestroy(pDistInfo->pResBlock);
}

729 730
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
                                               SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
731
  SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
732
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
733 734 735 736 737
  if (pInfo == NULL || pOperator == NULL) {
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
  }

738
  pInfo->pHandle = dataReader;
739
  pInfo->readHandle = *readHandle;
740 741
  pInfo->uid = uid;
  pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
742

743
  int32_t    numOfCols = 0;
744
  SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
745
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
746 747 748
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
749

750
  pOperator->name = "DataBlockDistScanOperator";
751
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
752 753 754
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
755 756 757 758
  pOperator->pTaskInfo = pTaskInfo;

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
                                         destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
759 760
  return pOperator;

761
_error:
H
Haojun Liao 已提交
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
}

static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
  size_t total = taosArrayGetSize(pInfo->pBlockLists);

  pInfo->validBlockIndex = 0;
  for (int32_t i = 0; i < total; ++i) {
    SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
    blockDataDestroy(p);
  }
  taosArrayClear(pInfo->pBlockLists);
}

H
Haojun Liao 已提交
778
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
779
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
780 781
}

H
Haojun Liao 已提交
782
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
783
  return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
784
}
5
54liuyao 已提交
785

5
54liuyao 已提交
786
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
787 788 789
  STimeWindow win = {
      .skey = INT64_MIN,
      .ekey = INT64_MAX,
790
  };
5
54liuyao 已提交
791
  bool needRead = false;
5
54liuyao 已提交
792 793
  if (!isStateWindow(pInfo) && (*pRowIndex) < pSDB->info.rows) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, tsColIndex);
794 795
    TSKEY*           tsCols = (TSKEY*)pColDataInfo->pData;
    SResultRowInfo   dumyInfo;
5
54liuyao 已提交
796
    dumyInfo.cur.pageId = -1;
5
54liuyao 已提交
797 798
    if (isSessionWindow(pInfo)) {
      SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
799 800 801
      int64_t              gap = pInfo->sessionSup.gap;
      int32_t              winIndex = 0;
      SResultWindowInfo*   pCurWin =
5
54liuyao 已提交
802
          getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
5
54liuyao 已提交
803
      win = pCurWin->win;
804
      (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
5
54liuyao 已提交
805
    } else {
806 807 808 809
      win =
          getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
      (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
                                               TSDB_ORDER_ASC);
5
54liuyao 已提交
810
    }
5
54liuyao 已提交
811 812 813 814 815
    needRead = true;
  } else if (isStateWindow(pInfo)) {
    SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
    int32_t size = taosArrayGetSize(pWins);
    if (pInfo->scanWinIndex < size) {
816
      win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
5
54liuyao 已提交
817 818 819 820 821 822 823 824
      pInfo->scanWinIndex++;
      needRead = true;
    } else {
      pInfo->scanWinIndex = 0;
      taosArrayClear(pWins);
    }
  }
  if (!needRead) {
5
54liuyao 已提交
825 826
    return false;
  }
L
Liu Jicong 已提交
827
  STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
5
54liuyao 已提交
828 829
  pTableScanInfo->cond.twindows[0] = win;
  pTableScanInfo->curTWinIdx = 0;
830
  //  tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
5
54liuyao 已提交
831 832 833
  // if (!pTableScanInfo->dataReader) {
  //   return false;
  // }
5
54liuyao 已提交
834
  pTableScanInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
835
  pTableScanInfo->currentGroupId = -1;
5
54liuyao 已提交
836
  return true;
5
54liuyao 已提交
837 838
}

839
static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) {
840
  for (int32_t j = 0; j < taosArrayGetSize(source->pDataBlock); j++) {
841 842 843 844 845 846
    SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j);
    SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j);
    if (colDataIsNull_s(pSourceCol, sourceRowId)) {
      colDataAppendNULL(pDestCol, dest->info.rows);
    } else {
      colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false);
5
54liuyao 已提交
847 848
    }
  }
849
  dest->info.rows++;
5
54liuyao 已提交
850 851
}

852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) {
  uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
  if (groupId) {
    return *groupId;
  }
  return 0;
  /* Todo(liuyao) for partition by column
  recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
  int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
  uint64_t resId = 0;
  uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
  if (groupId) {
    return *groupId;
  } else if (len != 0) {
    resId = calcGroupId(pTableScanInfo->keyBuf, len);
    taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
  }
  return resId;
  */
}

5
54liuyao 已提交
873
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
874 875
  while (1) {
    SSDataBlock* pResult = NULL;
L
Liu Jicong 已提交
876
    pResult = doTableScan(pInfo->pSnapshotReadOp);
877
    if (pResult == NULL) {
5
54liuyao 已提交
878
      if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
879
        // scan next window data
L
Liu Jicong 已提交
880
        pResult = doTableScan(pInfo->pSnapshotReadOp);
881 882 883 884 885
      }
    }
    if (!pResult) {
      return NULL;
    }
L
Liu Jicong 已提交
886

887 888
    if (pResult->info.groupId == pInfo->groupId) {
      return pResult;
5
54liuyao 已提交
889 890
    }
  }
891

L
Liu Jicong 已提交
892 893 894 895 896 897 898 899
  /* Todo(liuyao) for partition by column
    SSDataBlock* pBlock = createOneDataBlock(pResult, true);
    blockDataCleanup(pResult);
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
      uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
      if (id == pInfo->groupId) {
        copyOneRow(pResult, pBlock, i);
      }
900
    }
L
Liu Jicong 已提交
901 902
    return pResult;
  */
903 904 905 906
}

static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
  blockDataCleanup(pUpdateBlock);
5
54liuyao 已提交
907
  int32_t size = taosArrayGetSize(pInfo->tsArray);
908
  if (pInfo->tsArrayIndex < size) {
5
54liuyao 已提交
909
    SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
910
    ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
911
    blockDataEnsureCapacity(pUpdateBlock, size);
912 913

    int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
L
Liu Jicong 已提交
914
    pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
915
    int32_t i = 0;
L
Liu Jicong 已提交
916
    for (; i < size; i++) {
917
      rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
L
Liu Jicong 已提交
918
      uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
919 920
      if (pInfo->groupId != id) {
        break;
921
      }
922
      copyOneRow(pUpdateBlock, pBlock, rowId);
923
    }
924 925 926
    pUpdateBlock->info.rows = i;
    pInfo->tsArrayIndex += i;
    pUpdateBlock->info.groupId = pInfo->groupId;
5
54liuyao 已提交
927
    pUpdateBlock->info.type = STREAM_CLEAR;
5
54liuyao 已提交
928
    blockDataUpdateTsWindow(pUpdateBlock, 0);
929 930 931 932
  }
  // all rows have same group id
  ASSERT(pInfo->tsArrayIndex >= size);
  if (size > 0 && pInfo->tsArrayIndex == size) {
5
54liuyao 已提交
933
    taosArrayClear(pInfo->tsArray);
5
54liuyao 已提交
934 935 936
  }
}

937 938 939 940 941 942 943 944 945 946 947
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
                               SSDataBlock* pUpdateBlock) {
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY* ts = (TSKEY*)pColDataInfo->pData;
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
    if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) {
      taosArrayPush(pInfo->tsArray, &rowId);
    }
  }
  if (!pUpdateBlock) {
5
54liuyao 已提交
948
    taosArrayClear(pInfo->tsArray);
949
    return;
5
54liuyao 已提交
950
  }
951 952 953 954 955 956
  setUpdateData(pInfo, pBlock, pUpdateBlock);
  // Todo(liuyao) get from tsdb
  //  SSDataBlock* p = createOneDataBlock(pBlock, true);
  //  p->info.type = STREAM_INVERT;
  //  taosArrayClear(pInfo->tsArray);
  //  return p;
5
54liuyao 已提交
957 958
}

959
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
960 961 962 963
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SStreamBlockScanInfo* pInfo = pOperator->info;

964
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
965
  if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
966 967 968
    return NULL;
  }

5
54liuyao 已提交
969
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
970
  // TODO: refactor
L
Liu Jicong 已提交
971
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
972
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
973
      /*doClearBufferedBlocks(pInfo);*/
974
      pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
975 976 977
      return NULL;
    }

978
    int32_t      current = pInfo->validBlockIndex++;
979 980
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
    blockDataUpdateTsWindow(pBlock, 0);
5
54liuyao 已提交
981
    if (pBlock->info.type == STREAM_RETRIEVE) {
L
Liu Jicong 已提交
982
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
5
54liuyao 已提交
983 984 985 986 987 988
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
      copyDataBlock(pInfo->pPullDataRes, pBlock);
      pInfo->pullDataResIndex = 0;
      prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
      updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
    }
989
    return pBlock;
L
Liu Jicong 已提交
990
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
5
54liuyao 已提交
991 992 993 994 995 996
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
      blockDataDestroy(pInfo->pUpdateRes);
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      return pInfo->pRes;
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
997
      if (!isStateWindow(pInfo)) {
5
54liuyao 已提交
998
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
999
      }
5
54liuyao 已提交
1000
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1001 1002 1003 1004
    } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
      SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
      if (pSDB != NULL) {
        getUpdateDataBlock(pInfo, true, pSDB, NULL);
L
Liu Jicong 已提交
1005
        pSDB->info.type = STREAM_PULL_DATA;
5
54liuyao 已提交
1006 1007 1008
        return pSDB;
      }
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
5
54liuyao 已提交
1009
    } else {
1010
      if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
5
54liuyao 已提交
1011 1012
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
        pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
5
54liuyao 已提交
1013
        prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
1014 1015
      }
      if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
5
54liuyao 已提交
1016
        SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
5
54liuyao 已提交
1017
        if (pSDB == NULL) {
1018 1019 1020
          setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
          if (pInfo->pUpdateRes->info.rows > 0) {
            if (!isStateWindow(pInfo)) {
5
54liuyao 已提交
1021 1022 1023
              // Todo(liuyao) mybe can delete this.
              bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
              ASSERT(test == false);
1024 1025 1026 1027 1028
            }
            return pInfo->pUpdateRes;
          } else {
            pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
          }
5
54liuyao 已提交
1029
        } else {
5
54liuyao 已提交
1030
          pSDB->info.type = STREAM_NORMAL;
5
54liuyao 已提交
1031 1032 1033
          getUpdateDataBlock(pInfo, true, pSDB, NULL);
          return pSDB;
        }
5
54liuyao 已提交
1034
      }
5
54liuyao 已提交
1035
    }
5
54liuyao 已提交
1036

H
Haojun Liao 已提交
1037 1038 1039
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
    blockDataCleanup(pInfo->pRes);

1040
    while (tqNextDataBlock(pInfo->streamBlockReader)) {
1041
      SSDataBlock block = {0};
1042

1043
      // todo refactor
1044 1045 1046 1047 1048
      int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);

      uint64_t groupId = block.info.groupId;
      uint64_t uid = block.info.uid;
      int32_t  numOfRows = block.info.rows;
H
Haojun Liao 已提交
1049

1050 1051 1052
      if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
1053 1054
      }

1055 1056
      pInfo->pRes->info.groupId = groupId;
      pInfo->pRes->info.rows = numOfRows;
1057
      pInfo->pRes->info.uid = uid;
5
54liuyao 已提交
1058
      pInfo->pRes->info.type = STREAM_NORMAL;
1059
      pInfo->pRes->info.capacity = numOfRows;
H
Haojun Liao 已提交
1060

1061 1062 1063 1064
      // for generating rollup SMA result, each time is an independent time serie.
      // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
      if (pInfo->assignBlockUid) {
        pInfo->pRes->info.groupId = uid;
1065 1066
      } else {
        pInfo->pRes->info.groupId = groupId;
1067 1068
      }

1069 1070 1071 1072 1073
      uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
      if (groupIdPre) {
        pInfo->pRes->info.groupId = *groupIdPre;
      }

1074
      // todo extract method
1075
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
1076
        SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
H
Haojun Liao 已提交
1077 1078 1079 1080
        if (!pColMatchInfo->output) {
          continue;
        }

1081
        bool colExists = false;
1082 1083
        for (int32_t j = 0; j < blockDataGetNumOfCols(&block); ++j) {
          SColumnInfoData* pResCol = bdGetColumnInfoData(&block, j);
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
          if (pResCol->info.colId == pColMatchInfo->colId) {
            taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
            colExists = true;
            break;
          }
        }

        // the required column does not exists in submit block, let's set it to be all null value
        if (!colExists) {
          SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
          colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
        }
H
Haojun Liao 已提交
1096 1097
      }

H
Haojun Liao 已提交
1098
      taosArrayDestroy(block.pDataBlock);
H
Haojun Liao 已提交
1099 1100
      if (pInfo->pRes->pDataBlock == NULL) {
        // TODO add log
5
54liuyao 已提交
1101
        updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1102
        pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
1103 1104 1105
        pTaskInfo->code = terrno;
        return NULL;
      }
5
54liuyao 已提交
1106

1107 1108 1109 1110 1111
      // currently only the tbname pseudo column
      if (pInfo->numOfPseudoExpr > 0) {
        addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
      }

1112
      doFilter(pInfo->pCondition, pInfo->pRes);
1113
      blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1114 1115 1116
      if (pBlockInfo->rows > 0) {
        break;
      }
H
Haojun Liao 已提交
1117 1118 1119 1120
    }

    // record the scan action.
    pInfo->numOfExec++;
1121
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
1122

5
54liuyao 已提交
1123
    if (pBlockInfo->rows == 0) {
5
54liuyao 已提交
1124
      updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
1125
      pOperator->status = OP_EXEC_DONE;
5
54liuyao 已提交
1126
    } else if (pInfo->pUpdateInfo) {
1127
      pInfo->tsArrayIndex = 0;
5
54liuyao 已提交
1128 1129
      getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
      if (pInfo->pUpdateRes->info.rows > 0) {
5
54liuyao 已提交
1130
        if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
1131 1132
          pInfo->updateResIndex = 0;
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
5
54liuyao 已提交
1133
        } else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
5
54liuyao 已提交
1134
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
5
54liuyao 已提交
1135
          return pInfo->pUpdateRes;
5
54liuyao 已提交
1136
        }
5
54liuyao 已提交
1137
      }
1138 1139
    }

5
54liuyao 已提交
1140
    return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
L
Liu Jicong 已提交
1141

L
Liu Jicong 已提交
1142 1143 1144
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
    // check reader last status
    // if not match, reset status
L
Liu Jicong 已提交
1145
    SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
L
Liu Jicong 已提交
1146 1147
    return pResult && pResult->info.rows > 0 ? pResult : NULL;

L
Liu Jicong 已提交
1148 1149 1150
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1151 1152 1153
  }
}

1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
  for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
    STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

1166 1167 1168
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
                                            SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
                                            uint64_t taskId) {
H
Haojun Liao 已提交
1169 1170
  SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1171

H
Haojun Liao 已提交
1172 1173
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
1174
    goto _error;
H
Haojun Liao 已提交
1175 1176
  }

1177 1178 1179
  SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;

  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
1180

1181
  int32_t numOfCols = 0;
1182
  pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
1183 1184 1185

  int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
1186
  for (int32_t i = 0; i < numOfOutput; ++i) {
1187 1188 1189
    SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i);

    int16_t colId = id->colId;
1190
    taosArrayPush(pColIds, &colId);
5
54liuyao 已提交
1191 1192 1193
    if (id->colId == pTableScanNode->tsColId) {
      pInfo->primaryTsIndex = id->targetSlotId;
    }
H
Haojun Liao 已提交
1194 1195 1196 1197
  }

  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
1198 1199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
1200 1201
  }

1202
  pInfo->tsArray = taosArrayInit(4, sizeof(int32_t));
5
54liuyao 已提交
1203
  if (pInfo->tsArray == NULL) {
1204
    goto _error;
5
54liuyao 已提交
1205 1206
  }

L
Liu Jicong 已提交
1207
  if (pHandle) {
wmmhello's avatar
wmmhello 已提交
1208
    SOperatorInfo*  pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
L
Liu Jicong 已提交
1209 1210 1211 1212 1213 1214
    STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
    if (pSTInfo->interval.interval > 0) {
      pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
    } else {
      pInfo->pUpdateInfo = NULL;
    }
L
Liu Jicong 已提交
1215
    pInfo->pSnapshotReadOp = pTableScanDummy;
L
Liu Jicong 已提交
1216 1217
    pInfo->interval = pSTInfo->interval;

L
Liu Jicong 已提交
1218 1219 1220 1221 1222
    pInfo->readHandle = *pHandle;
    ASSERT(pHandle->reader);
    pInfo->streamBlockReader = pHandle->reader;
    pInfo->tableUid = pScanPhyNode->uid;

L
Liu Jicong 已提交
1223
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
1224
    tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
L
Liu Jicong 已提交
1225 1226 1227 1228 1229 1230 1231
    SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
    int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
5
54liuyao 已提交
1232 1233
  }

1234 1235 1236 1237 1238
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

1239
  pInfo->pRes = createResDataBlock(pDescNode);
X
Xiaoyu Wang 已提交
1240
  pInfo->pUpdateRes = createResDataBlock(pDescNode);
1241 1242 1243
  pInfo->pCondition = pScanPhyNode->node.pConditions;
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
  pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
1244
  pInfo->groupId = 0;
5
54liuyao 已提交
1245
  pInfo->pPullDataRes = createPullDataBlock();
L
Liu Jicong 已提交
1246

1247
  pOperator->name = "StreamBlockScanOperator";
L
Liu Jicong 已提交
1248
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1249 1250 1251
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1252
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
1253
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
1254

1255 1256
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
1257

H
Haojun Liao 已提交
1258
  return pOperator;
1259

L
Liu Jicong 已提交
1260
_error:
1261 1262 1263
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
1264 1265 1266 1267 1268 1269 1270
}

static void destroySysScanOperator(void* param, int32_t numOfOutput) {
  SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param;
  tsem_destroy(&pInfo->ready);
  blockDataDestroy(pInfo->pRes);

1271
  const char* name = tNameGetTableName(&pInfo->name);
1272
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
H
Haojun Liao 已提交
1273
    metaCloseTbCursor(pInfo->pCur);
1274
    pInfo->pCur = NULL;
H
Haojun Liao 已提交
1275
  }
H
Haojun Liao 已提交
1276 1277

  taosArrayDestroy(pInfo->scanCols);
H
Haojun Liao 已提交
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
}

EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
  int32_t   code = TSDB_CODE_SUCCESS;
  ENodeType nType = nodeType(pNode);

  switch (nType) {
    case QUERY_NODE_OPERATOR: {
      SOperatorNode* node = (SOperatorNode*)pNode;
      if (OP_TYPE_EQUAL == node->opType) {
        *(int32_t*)pContext = 1;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_IGNORE_CHILD;
    }
    case QUERY_NODE_COLUMN: {
      if (1 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SColumnNode* node = (SColumnNode*)pNode;
      if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
        *(int32_t*)pContext = 2;
        return DEAL_RES_CONTINUE;
      }

      *(int32_t*)pContext = 0;
      return DEAL_RES_CONTINUE;
    }
    case QUERY_NODE_VALUE: {
      if (2 != *(int32_t*)pContext) {
        return DEAL_RES_CONTINUE;
      }

      SValueNode* node = (SValueNode*)pNode;
      char*       dbName = nodesGetValueFromNode(node);
      strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
      *((char*)pContext + varDataLen(dbName)) = 0;
1318
      return DEAL_RES_END;  // stop walk
H
Haojun Liao 已提交
1319 1320 1321 1322 1323 1324 1325
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

1326
static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
H
Haojun Liao 已提交
1327 1328 1329
  if (NULL == pCondition) {
    return;
  }
L
Liu Jicong 已提交
1330
  nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
H
Haojun Liao 已提交
1331 1332
}

1333
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
1334 1335 1336 1337 1338 1339 1340
  SOperatorInfo*     operator=(SOperatorInfo*) param;
  SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
  if (TSDB_CODE_SUCCESS == code) {
    pScanResInfo->pRsp = pMsg->pData;

    SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
1341 1342 1343
    pRsp->useconds = htobe64(pRsp->useconds);
    pRsp->handle = htobe64(pRsp->handle);
    pRsp->compLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
1344 1345 1346 1347 1348
  } else {
    operator->pTaskInfo->code = code;
  }

  tsem_post(&pScanResInfo->ready);
wmmhello's avatar
wmmhello 已提交
1349
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1350 1351 1352 1353 1354 1355 1356
}

static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
  if (pInfo->pCondition == NULL) {
    return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
  }

1357 1358
  doFilter(pInfo->pCondition, pInfo->pRes);
#if 0
H
Haojun Liao 已提交
1359
  SFilterInfo* filter = NULL;
1360 1361

  int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
H
Haojun Liao 已提交
1362 1363 1364 1365 1366

  SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
L
Liu Jicong 已提交
1367
  bool    keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1368
  filterFreeInfo(filter);
H
Haojun Liao 已提交
1369

1370
  SSDataBlock* px = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
1371 1372 1373 1374 1375 1376 1377 1378
  blockDataEnsureCapacity(px, pInfo->pRes->info.rows);

  // TODO refactor
  int32_t numOfRow = 0;
  for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
    SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);

D
dapan1121 已提交
1379
    if (keep) {
1380
      colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info);
D
dapan1121 已提交
1381 1382 1383 1384 1385 1386 1387
      numOfRow = pInfo->pRes->info.rows;
    } else if (NULL != rowRes) {
      numOfRow = 0;
      for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
        if (rowRes[j] == 0) {
          continue;
        }
1388

1389 1390 1391 1392 1393 1394
        if (colDataIsNull_s(pSrc, j)) {
          colDataAppendNULL(pDest, numOfRow);
        } else {
          colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
        }

D
dapan1121 已提交
1395
        numOfRow += 1;
H
Haojun Liao 已提交
1396
      }
D
dapan1121 已提交
1397 1398
    } else {
      numOfRow = 0;
H
Haojun Liao 已提交
1399 1400 1401 1402 1403
    }
  }

  px->info.rows = numOfRow;
  pInfo->pRes = px;
1404
#endif
H
Haojun Liao 已提交
1405 1406 1407 1408

  return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}

1409
static SSDataBlock* buildSysTableMetaBlock() {
L
Liu Jicong 已提交
1410 1411
  size_t               size = 0;
  const SSysTableMeta* pMeta = NULL;
1412 1413 1414
  getInfosDbMeta(&pMeta, &size);

  int32_t index = 0;
L
Liu Jicong 已提交
1415 1416
  for (int32_t i = 0; i < size; ++i) {
    if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
1417 1418 1419 1420
      index = i;
      break;
    }
  }
1421

1422
  SSDataBlock* pBlock = createDataBlock();
L
Liu Jicong 已提交
1423
  for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
L
Liu Jicong 已提交
1424 1425
    SColumnInfoData colInfoData =
        createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
1426
    blockDataAppendColInfo(pBlock, &colInfoData);
1427 1428
  }

1429 1430 1431
  return pBlock;
}

1432
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1433 1434 1435 1436 1437
  // build message and send to mnode to fetch the content of system tables.
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
  SSysTableScanInfo* pInfo = pOperator->info;

  // retrieve local table list info from vnode
1438 1439
  const char* name = tNameGetTableName(&pInfo->name);
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
1440 1441 1442 1443
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1444 1445
    // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
    if (pInfo->readHandle.mnd != NULL) {
1446
      buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
1447

1448 1449
      doFilterResult(pInfo);
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
1450

1451
      doSetOperatorCompleted(pOperator);
1452 1453 1454 1455 1456
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    } else {
      if (pInfo->pCur == NULL) {
        pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
      }
1457

1458 1459
      blockDataCleanup(pInfo->pRes);
      int32_t numOfRows = 0;
1460

1461 1462 1463
      const char* db = NULL;
      int32_t     vgId = 0;
      vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
1464

1465 1466 1467
      SName sn = {0};
      char  dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
      tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
1468

1469 1470
      tNameGetDbName(&sn, varDataVal(dbname));
      varDataSetLen(dbname, strlen(varDataVal(dbname)));
1471

1472
      SSDataBlock* p = buildSysTableMetaBlock();
1473
      blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
1474

1475
      char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1476 1477 1478

      int32_t ret = 0;
      while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
1479
        STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
1480

1481 1482 1483
        // table name
        SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
        colDataAppend(pColInfoData, numOfRows, n, false);
1484

1485 1486 1487
        // database name
        pColInfoData = taosArrayGet(p->pDataBlock, 1);
        colDataAppend(pColInfoData, numOfRows, dbname, false);
1488

1489 1490 1491
        // vgId
        pColInfoData = taosArrayGet(p->pDataBlock, 6);
        colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
1492

1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
        int32_t tableType = pInfo->pCur->mr.me.type;
        if (tableType == TSDB_CHILD_TABLE) {
          // create time
          int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);

          SMetaReader mr = {0};
          metaReaderInit(&mr, pInfo->readHandle.meta, 0);
          metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1506
          colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
1507 1508

          // super table name
wmmhello's avatar
wmmhello 已提交
1509
          STR_TO_VARSTR(n, mr.me.name);
1510
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
wmmhello's avatar
wmmhello 已提交
1511
          colDataAppend(pColInfoData, numOfRows, n, false);
1512 1513
          metaReaderClear(&mr);

wmmhello's avatar
wmmhello 已提交
1514 1515
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1516
          if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1517 1518 1519
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1520
          } else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1521 1522 1523
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1524
          } else {
wmmhello's avatar
wmmhello 已提交
1525 1526 1527
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1528 1529 1530 1531 1532 1533 1534 1535
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1536
          STR_TO_VARSTR(n, "CHILD_TABLE");
1537 1538 1539 1540 1541 1542 1543
        } else if (tableType == TSDB_NORMAL_TABLE) {
          // create time
          pColInfoData = taosArrayGet(p->pDataBlock, 2);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);

          // number of columns
          pColInfoData = taosArrayGet(p->pDataBlock, 3);
1544
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
1545 1546 1547 1548 1549

          // super table name
          pColInfoData = taosArrayGet(p->pDataBlock, 4);
          colDataAppendNULL(pColInfoData, numOfRows);

wmmhello's avatar
wmmhello 已提交
1550 1551
          // table comment
          pColInfoData = taosArrayGet(p->pDataBlock, 8);
L
Liu Jicong 已提交
1552
          if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
wmmhello's avatar
wmmhello 已提交
1553 1554 1555
            char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1556
          } else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
wmmhello's avatar
wmmhello 已提交
1557 1558 1559
            char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
            STR_TO_VARSTR(comment, "");
            colDataAppend(pColInfoData, numOfRows, comment, false);
L
Liu Jicong 已提交
1560
          } else {
wmmhello's avatar
wmmhello 已提交
1561 1562 1563
            colDataAppendNULL(pColInfoData, numOfRows);
          }

1564 1565 1566 1567 1568 1569 1570 1571
          // uid
          pColInfoData = taosArrayGet(p->pDataBlock, 5);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);

          // ttl
          pColInfoData = taosArrayGet(p->pDataBlock, 7);
          colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);

wmmhello's avatar
wmmhello 已提交
1572
          STR_TO_VARSTR(n, "NORMAL_TABLE");
1573
        }
1574

1575
        pColInfoData = taosArrayGet(p->pDataBlock, 9);
wmmhello's avatar
wmmhello 已提交
1576
        colDataAppend(pColInfoData, numOfRows, n, false);
1577

1578
        if (++numOfRows >= pOperator->resultInfo.capacity) {
1579 1580
          break;
        }
H
Haojun Liao 已提交
1581 1582
      }

1583 1584 1585 1586 1587 1588 1589
      // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
      if (ret != 0) {
        metaCloseTbCursor(pInfo->pCur);
        pInfo->pCur = NULL;
        doSetOperatorCompleted(pOperator);
      }

1590 1591
      p->info.rows = numOfRows;
      pInfo->pRes->info.rows = numOfRows;
H
Haojun Liao 已提交
1592

1593
      relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1594
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1595

1596 1597
      blockDataDestroy(p);

1598 1599 1600
      pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
      return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
    }
H
Haojun Liao 已提交
1601 1602 1603 1604 1605
  } else {  // load the meta from mnode of the given epset
    if (pOperator->status == OP_EXEC_DONE) {
      return NULL;
    }

1606 1607 1608
    while (1) {
      int64_t startTs = taosGetTimestampUs();
      strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
1609
      strcpy(pInfo->req.user, pInfo->pUser);
H
Haojun Liao 已提交
1610

1611 1612 1613 1614 1615
      if (pInfo->showRewrite) {
        char dbName[TSDB_DB_NAME_LEN] = {0};
        getDBNameFromCondition(pInfo->pCondition, dbName);
        sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
      }
H
Haojun Liao 已提交
1616

1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
      int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
      char*   buf1 = taosMemoryCalloc(1, contLen);
      tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);

      // send the fetch remote task result reques
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (NULL == pMsgSendInfo) {
        qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
        pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
        return NULL;
      }
H
Haojun Liao 已提交
1628

L
Liu Jicong 已提交
1629 1630
      int32_t msgType = (strcasecmp(name, TSDB_INS_TABLE_DNODE_VARIABLES) == 0) ? TDMT_DND_SYSTABLE_RETRIEVE
                                                                                : TDMT_MND_SYSTABLE_RETRIEVE;
D
dapan1121 已提交
1631

1632 1633 1634
      pMsgSendInfo->param = pOperator;
      pMsgSendInfo->msgInfo.pData = buf1;
      pMsgSendInfo->msgInfo.len = contLen;
D
dapan1121 已提交
1635
      pMsgSendInfo->msgType = msgType;
1636
      pMsgSendInfo->fp = loadSysTableCallback;
D
dapan1121 已提交
1637
      pMsgSendInfo->requestId = pTaskInfo->id.queryId;
H
Haojun Liao 已提交
1638

1639
      int64_t transporterId = 0;
1640 1641
      int32_t code =
          asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
1642
      tsem_wait(&pInfo->ready);
H
Haojun Liao 已提交
1643

1644 1645 1646 1647 1648
      if (pTaskInfo->code) {
        qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
               pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
        return NULL;
      }
H
Haojun Liao 已提交
1649

1650 1651
      SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
      pInfo->req.showId = pRsp->handle;
H
Haojun Liao 已提交
1652

1653 1654
      if (pRsp->numOfRows == 0 || pRsp->completed) {
        pOperator->status = OP_EXEC_DONE;
1655
        qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
1656
               pRsp->numOfRows, pInfo->loadInfo.totalRows);
H
Haojun Liao 已提交
1657

1658
        if (pRsp->numOfRows == 0) {
H
Haojun Liao 已提交
1659
          taosMemoryFree(pRsp);
1660 1661 1662
          return NULL;
        }
      }
H
Haojun Liao 已提交
1663

1664
      extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
L
Liu Jicong 已提交
1665
                                   pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
H
Haojun Liao 已提交
1666

1667 1668
      // todo log the filter info
      doFilterResult(pInfo);
H
Haojun Liao 已提交
1669
      taosMemoryFree(pRsp);
1670 1671
      if (pInfo->pRes->info.rows > 0) {
        return pInfo->pRes;
D
dapan1121 已提交
1672 1673
      } else if (pOperator->status == OP_EXEC_DONE) {
        return NULL;
1674
      }
1675
    }
H
Haojun Liao 已提交
1676 1677 1678
  }
}

1679
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
1680
  SSDataBlock* p = buildSysTableMetaBlock();
1681
  blockDataEnsureCapacity(p, capacity);
1682

L
Liu Jicong 已提交
1683
  size_t               size = 0;
1684 1685 1686 1687 1688 1689 1690 1691
  const SSysTableMeta* pSysDbTableMeta = NULL;

  getInfosDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);

  getPerfDbMeta(&pSysDbTableMeta, &size);
  p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);

1692
  relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
1693
  pInfo->pRes->info.rows = p->info.rows;
1694 1695 1696
  blockDataDestroy(p);

  return pInfo->pRes->info.rows;
1697 1698
}

L
Liu Jicong 已提交
1699 1700 1701
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
                              const char* dbName) {
  char    n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
1702 1703
  int32_t numOfRows = p->info.rows;

L
Liu Jicong 已提交
1704
  for (int32_t i = 0; i < size; ++i) {
1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
    const SSysTableMeta* pm = &pSysDbTableMeta[i];

    SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);

    STR_TO_VARSTR(n, pm->name);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // database name
    STR_TO_VARSTR(n, dbName);
    pColInfoData = taosArrayGet(p->pDataBlock, 1);
    colDataAppend(pColInfoData, numOfRows, n, false);

    // create time
    pColInfoData = taosArrayGet(p->pDataBlock, 2);
    colDataAppendNULL(pColInfoData, numOfRows);

    // number of columns
    pColInfoData = taosArrayGet(p->pDataBlock, 3);
    colDataAppend(pColInfoData, numOfRows, (char*)&pm->colNum, false);

L
Liu Jicong 已提交
1725
    for (int32_t j = 4; j <= 8; ++j) {
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
      pColInfoData = taosArrayGet(p->pDataBlock, j);
      colDataAppendNULL(pColInfoData, numOfRows);
    }

    STR_TO_VARSTR(n, "SYSTEM_TABLE");

    pColInfoData = taosArrayGet(p->pDataBlock, 9);
    colDataAppend(pColInfoData, numOfRows, n, false);

    numOfRows += 1;
  }

  return numOfRows;
}

1741
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
1742
                                              const char* pUser, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1743 1744 1745
  SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
1746
    goto _error;
H
Haojun Liao 已提交
1747 1748
  }

1749 1750 1751
  SScanPhysiNode* pScanNode = &pScanPhyNode->scan;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
1752
  SSDataBlock*        pResBlock = createResDataBlock(pDescNode);
1753 1754

  int32_t num = 0;
1755
  SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
1756

1757 1758
  pInfo->accountId = pScanPhyNode->accountId;
  pInfo->pUser = taosMemoryStrDup((void*)pUser);
1759
  pInfo->showRewrite = pScanPhyNode->showRewrite;
1760 1761 1762
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pScanNode->node.pConditions;
  pInfo->scanCols = colList;
1763 1764

  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
1765

1766
  tNameAssign(&pInfo->name, &pScanNode->tableName);
1767
  const char* name = tNameGetTableName(&pInfo->name);
1768

1769
  if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
L
Liu Jicong 已提交
1770
    pInfo->readHandle = *(SReadHandle*)readHandle;
1771
    blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1772 1773
  } else {
    tsem_init(&pInfo->ready, 0, 0);
1774
    pInfo->epSet = pScanPhyNode->mgmtEpSet;
1775
    pInfo->readHandle = *(SReadHandle*)readHandle;
H
Haojun Liao 已提交
1776 1777
  }

1778
  pOperator->name = "SysTableScanOperator";
H
Haojun Liao 已提交
1779
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
1780 1781 1782
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
1783
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock);
1784
  pOperator->pTaskInfo = pTaskInfo;
1785

L
Liu Jicong 已提交
1786 1787
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
H
Haojun Liao 已提交
1788 1789

  return pOperator;
1790

1791
_error:
1792 1793 1794 1795
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
1796
}
H
Haojun Liao 已提交
1797

1798
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1799 1800 1801 1802
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

1803 1804 1805
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

#if 0
H
Haojun Liao 已提交
1806 1807 1808 1809 1810 1811 1812 1813
  int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;

  STagScanInfo *pInfo = pOperator->info;
  SSDataBlock  *pRes = pInfo->pRes;

  int32_t count = 0;
  SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);

1814
  int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
H
Haojun Liao 已提交
1815 1816 1817
  if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
    assert(pQueryAttr->numOfOutput == 1);

1818
    SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
H
Haojun Liao 已提交
1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877
    int32_t rsize = pExprInfo->base.resSchema.bytes;

    count = 0;

    int16_t bytes = pExprInfo->base.resSchema.bytes;
    int16_t type  = pExprInfo->base.resSchema.type;

    for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
      if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
        bytes = pQueryAttr->tagColList[i].bytes;
        type = pQueryAttr->tagColList[i].type;
        break;
      }
    }

    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);

    while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
      int32_t i = pInfo->curPos++;
      STableQueryInfo *item = taosArrayGetP(pa, i);

      char *output = pColInfo->pData + count * rsize;
      varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);

      output = varDataVal(output);
      STableId* id = TSDB_TABLEID(item->pTable);

      *(int16_t *)output = 0;
      output += sizeof(int16_t);

      *(int64_t *)output = id->uid;  // memory align problem, todo serialize
      output += sizeof(id->uid);

      *(int32_t *)output = id->tid;
      output += sizeof(id->tid);

      *(int32_t *)output = pQueryAttr->vgId;
      output += sizeof(pQueryAttr->vgId);

      char* data = NULL;
      if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
        data = tsdbGetTableName(item->pTable);
      } else {
        data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
      }

      doSetTagValueToResultBuf(output, data, type, bytes);
      count += 1;
    }

    //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
  } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
    *(int64_t*)pColInfo->pData = pInfo->totalTables;
    count = 1;

    pOperator->status = OP_EXEC_DONE;
    //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
  } else {  // return only the tags|table name etc.
1878
#endif
H
Haojun Liao 已提交
1879

1880
  STagScanInfo* pInfo = pOperator->info;
1881
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
1882
  SSDataBlock*  pRes = pInfo->pRes;
H
Haojun Liao 已提交
1883

wmmhello's avatar
wmmhello 已提交
1884 1885
  int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
  if (size == 0) {
H
Haojun Liao 已提交
1886 1887 1888 1889
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

1890 1891 1892
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
1893
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
1894

wmmhello's avatar
wmmhello 已提交
1895 1896
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
    STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
1897
    metaGetTableEntryByUid(&mr, item->uid);
H
Haojun Liao 已提交
1898

1899
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
1900 1901 1902 1903 1904 1905
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
1906
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
1907 1908 1909 1910
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
        const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);

1911 1912 1913 1914
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
1915 1916
          data = (char*)p;
        }
L
Liu Jicong 已提交
1917 1918
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
1919

1920 1921
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
1922
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
1923
        }
H
Haojun Liao 已提交
1924 1925 1926
      }
    }

1927
    count += 1;
wmmhello's avatar
wmmhello 已提交
1928
    if (++pInfo->curPos >= size) {
1929
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1930 1931 1932
    }
  }

1933 1934
  metaReaderClear(&mr);

1935
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
1936
  if (pOperator->status == OP_EXEC_DONE) {
1937
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
1938 1939 1940
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
1941
  pOperator->resultInfo.totalRows += count;
1942

1943
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
1944 1945 1946 1947 1948 1949 1950
}

static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
}

1951 1952
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
1953
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
1954 1955 1956 1957 1958
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1959 1960
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

1961
  int32_t    num = 0;
1962 1963
  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
L
Liu Jicong 已提交
1964
  SArray*    colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
1965

1966 1967 1968 1969
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1970

1971 1972 1973 1974 1975
  pInfo->pTableList = pTableListInfo;
  pInfo->pColMatchInfo = colList;
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
1976

1977
  pOperator->name = "TagScanOperator";
1978
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
1979

1980 1981 1982 1983
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
1984

1985 1986 1987
  initResultSizeInfo(pOperator, 4096);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

1988 1989
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
H
Haojun Liao 已提交
1990 1991

  return pOperator;
1992

1993
_error:
H
Haojun Liao 已提交
1994 1995 1996 1997 1998
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
1999 2000

typedef struct STableMergeScanInfo {
2001
  STableListInfo* tableListInfo;
S
slzhou 已提交
2002 2003 2004 2005
  int32_t         tableStartIndex;
  int32_t         tableEndIndex;
  bool            hasGroupId;
  uint64_t        groupId;
2006

2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018
  SArray*     dataReaders;  // array of tsdbReaderT*
  SReadHandle readHandle;

  int32_t  bufPageSize;
  uint32_t sortBufSize;  // max buffer size for in-memory sort

  SArray*      pSortInfo;
  SSortHandle* pSortHandle;

  SSDataBlock* pSortInputBlock;
  int64_t      startTs;  // sort start time

2019 2020 2021
  SArray*  sortSourceParams;
  uint64_t queryId;
  uint64_t taskId;
2022 2023 2024 2025 2026 2027 2028 2029 2030

  SFileBlockLoadRecorder readRecorder;
  int64_t                numOfRows;
  //  int32_t         prevGroupId;  // previous table group id
  SScanInfo       scanInfo;
  int32_t         scanTimes;
  SNode*          pFilterNode;  // filter info, which is push down by optimizer
  SqlFunctionCtx* pCtx;         // which belongs to the direct upstream operator operator query context
  SResultRowInfo* pResultRowInfo;
2031
  int32_t*        rowEntryInfoOffset;
2032 2033 2034 2035 2036 2037 2038 2039
  SExprInfo*      pExpr;
  SSDataBlock*    pResBlock;
  SArray*         pColMatchInfo;
  int32_t         numOfOutput;

  SExprInfo*      pPseudoExpr;
  int32_t         numOfPseudoExpr;
  SqlFunctionCtx* pPseudoCtx;
2040
  //  int32_t*        rowEntryInfoOffset;
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050

  SQueryTableDataCond cond;
  int32_t             scanFlag;  // table scan flag to denote if it is a repeat/reverse/main scan
  int32_t             dataBlockLoadFlag;
  SInterval interval;  // if the upstream is an interval operator, the interval info is also kept here to get the time
                       // window to check if current data block needs to be loaded.

  SSampleExecInfo sample;  // sample execution info
} STableMergeScanInfo;

2051
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
2052
                                STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
wmmhello's avatar
wmmhello 已提交
2053
  int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
2054
  if (code != TSDB_CODE_SUCCESS) {
2055
    return code;
2056 2057 2058 2059
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
2060 2061
    return TSDB_CODE_SUCCESS;
  }
2062
  code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
2063
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2064
    return code;
2065 2066 2067 2068 2069
  }

  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
                                  int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
                                  uint64_t taskId) {
  for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
    SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));

    tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, queryId, taskId);
    taosArrayPush(arrayReader, &pReader);

    taosArrayDestroy(subTableList);
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2086
// todo refactor
2087 2088
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
                                         int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
L
Liu Jicong 已提交
2089
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
S
shenglian zhou 已提交
2090
  STableMergeScanInfo* pInfo = pOperator->info;
2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116

  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;

  pCost->totalBlocks += 1;
  pCost->totalRows += pBlock->info.rows;

  *status = pInfo->dataBlockLoadFlag;
  if (pTableScanInfo->pFilterNode != NULL ||
      overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
  taosMemoryFreeClear(pBlock->pBlockAgg);

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->filterOutBlocks += 1;
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

    // clear all data in pBlock that are set when handing the previous block
2117
    for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
      SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
      pcol->pData = NULL;
    }

    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
    pCost->loadBlockStatis += 1;

    bool             allColumnsHaveAgg = true;
    SColumnDataAgg** pColAgg = NULL;
    tsdbReaderT*     reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
    tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg);

    if (allColumnsHaveAgg == true) {
2132
      int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174

      // todo create this buffer during creating operator
      if (pBlock->pBlockAgg == NULL) {
        pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
      }

      for (int32_t i = 0; i < numOfCols; ++i) {
        SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
        if (!pColMatchInfo->output) {
          continue;
        }
        pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
      }

      return TSDB_CODE_SUCCESS;
    } else {  // failed to load the block sma data, data block statistics does not exist, load data block instead
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
    }
  }

  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);

  // todo filter data block according to the block sma data firstly
#if 0
  if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
           pBlockInfo->window.ekey, pBlockInfo->rows);
    (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
    return TSDB_CODE_SUCCESS;
  }
#endif

  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;

  tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
  SArray*      pCols = tsdbRetrieveDataBlock(reader, NULL);
  if (pCols == NULL) {
    return terrno;
  }

2175
  relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200

  // currently only the tbname pseudo column
  if (pTableScanInfo->numOfPseudoExpr > 0) {
    addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
                           pBlock);
  }

  int64_t st = taosGetTimestampMs();
  doFilter(pTableScanInfo->pFilterNode, pBlock);

  int64_t et = taosGetTimestampMs();
  pTableScanInfo->readRecorder.filterTime += (et - st);

  if (pBlock->info.rows == 0) {
    pCost->filterOutBlocks += 1;
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
  }

  return TSDB_CODE_SUCCESS;
}

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
2201
  SSDataBlock*   inputBlock;
2202 2203 2204 2205 2206 2207
} STableMergeScanSortSourceParam;

static SSDataBlock* getTableDataBlock(void* param) {
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  int32_t                         readerIdx = source->readerIdx;
2208
  SSDataBlock*                    pBlock = source->inputBlock;
2209 2210 2211 2212
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  int64_t st = taosGetTimestampUs();

2213 2214
  blockDataCleanup(pBlock);

2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226
  tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
  while (tsdbNextDataBlock(reader)) {
    if (isTaskKilled(pOperator->pTaskInfo)) {
      longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

2227 2228 2229 2230 2231 2232 2233
    blockDataCleanup(pBlock);
    SDataBlockInfo binfo = pBlock->info;
    tsdbRetrieveDataBlockInfo(reader, &binfo);

    binfo.capacity = binfo.rows;
    blockDataEnsureCapacity(pBlock, binfo.capacity);
    pBlock->info = binfo;
2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271

    uint32_t status = 0;
    int32_t  code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pOperator->pTaskInfo->env, code);
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

    uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
    if (groupId) {
      pBlock->info.groupId = *groupId;
    }

    pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
    pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;

    return pBlock;
  }
  return NULL;
}

SArray* generateSortByTsInfo(int32_t order) {
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
  bi.slotId = 0;
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

2272
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2273 2274 2275
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286
  {
    size_t  tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
    int32_t i = pInfo->tableStartIndex + 1;
    for (; i < tableListSize; ++i) {
      STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2287

S
slzhou 已提交
2288 2289
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2290

S
slzhou 已提交
2291 2292
  STableListInfo* tableListInfo = pInfo->tableListInfo;
  createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
2293
                            pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
2294

2295 2296
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2297
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2298
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2299 2300
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2301 2302 2303 2304

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);

  size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
2305 2306 2307 2308 2309 2310 2311 2312
  for (int32_t i = 0; i < numReaders; ++i) {
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
  }

2313 2314
  for (int32_t i = 0; i < numReaders; ++i) {
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2315
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
    ps->param = param;
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, terrno);
  }

2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  tsortDestroySortHandle(pInfo->pSortHandle);
  taosArrayClear(pInfo->sortSourceParams);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) {
    tsdbReaderT* reader = taosArrayGetP(pInfo->dataReaders, i);
    tsdbCleanupReadHandle(reader);
  }
  taosArrayDestroy(pInfo->dataReaders);
2341 2342 2343 2344

  return TSDB_CODE_SUCCESS;
}

2345
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) {
2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2357
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2358 2359 2360 2361
    if (pTupleHandle == NULL) {
      break;
    }

2362
    appendOneRowToDataBlock(p, pTupleHandle);
2363 2364 2365 2366 2367
    if (p->info.rows >= capacity) {
      break;
    }
  }

2368 2369
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
  return (p->info.rows > 0) ? p : NULL;
2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
S
slzhou 已提交
2384 2385 2386
  size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2387

S
slzhou 已提交
2388
    if (tableListSize == 0) {
2389 2390 2391
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
S
slzhou 已提交
2392 2393
    pInfo->tableStartIndex = 0;
    pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
2394 2395
    startGroupTableMergeScan(pOperator);
  }
S
slzhou 已提交
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
        doSetOperatorCompleted(pOperator);
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
      pInfo->groupId =
          ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2414 2415
  }

2416 2417 2418 2419 2420
  return pBlock;
}

void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
2421
  cleanupQueryTableDataCond(&pTableScanInfo->cond);
2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432

  if (pTableScanInfo->pColMatchInfo != NULL) {
    taosArrayDestroy(pTableScanInfo->pColMatchInfo);
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
}

2433 2434
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
L
Liu Jicong 已提交
2435
  SSortExecInfo          sortExecInfo;
2436 2437
} STableMergeScanExecInfo;

2438 2439
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2440 2441
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2442
  STableMergeScanInfo*     pInfo = pOptr->info;
2443 2444 2445 2446 2447
  execInfo->blockRecorder = pInfo->readRecorder;
  execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2448

2449 2450 2451
  return TSDB_CODE_SUCCESS;
}

S
slzhou 已提交
2452 2453 2454 2455 2456 2457
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
  const STableKeyInfo* info1 = p1;
  const STableKeyInfo* info2 = p2;
  return info1->groupId - info2->groupId;
}

2458 2459 2460
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
                                                SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
                                                uint64_t taskId) {
2461 2462 2463 2464 2465
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2466
  if (pTableScanNode->pGroupTags) {
S
slzhou 已提交
2467 2468
    taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
  }
2469 2470 2471 2472

  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
L
Liu Jicong 已提交
2473
  SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
2474 2475 2476 2477 2478 2479 2480 2481

  int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
2482
    pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset);
2483 2484 2485 2486
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

L
Liu Jicong 已提交
2487 2488
  pInfo->readHandle = *readHandle;
  pInfo->interval = extractIntervalInfo(pTableScanNode);
2489
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2490 2491 2492
  pInfo->sample.seed = taosGetTimestampSec();
  pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
  pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
2493
  pInfo->tableListInfo = pTableListInfo;
L
Liu Jicong 已提交
2494 2495
  pInfo->scanFlag = MAIN_SCAN;
  pInfo->pColMatchInfo = pColList;
2496 2497

  pInfo->pResBlock = createResDataBlock(pDescNode);
2498 2499 2500
  pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
  pInfo->queryId = queryId;
  pInfo->taskId = taskId;
2501

2502
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2503

2504 2505
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2506

2507
  int32_t rowSize = pInfo->pResBlock->info.rowSize;
L
Liu Jicong 已提交
2508
  pInfo->bufPageSize = getProperSortPageSize(rowSize);
2509

L
Liu Jicong 已提交
2510
  pOperator->name = "TableMergeScanOperator";
2511
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
L
Liu Jicong 已提交
2512 2513 2514
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2515
  pOperator->exprSupp.numOfExprs = numOfCols;
L
Liu Jicong 已提交
2516
  pOperator->pTaskInfo = pTaskInfo;
2517 2518 2519
  initResultSizeInfo(pOperator, 1024);

  pOperator->fpSet =
2520 2521
      createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
                          NULL, getTableMergeScanExplainExecInfo);
2522 2523 2524 2525 2526 2527 2528 2529 2530
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}