tsdbRead.c 134.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
20 21
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define DEFAULT_ROW_INDEX_VAL  (-1)
H
Hongze Cheng 已提交
22

23 24 25 26 27 28
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

29
typedef struct {
dengyihao's avatar
dengyihao 已提交
30
  STbDataIter* iter;
31 32 33 34
  int32_t      index;
  bool         hasVal;
} SIterInfo;

35 36 37 38 39
typedef struct {
  int32_t numOfBlocks;
  int32_t numOfLastBlocks;
} SBlockNumber;

H
Haojun Liao 已提交
40
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
41 42
  uint64_t  uid;
  TSKEY     lastKey;
43 44 45 46 47
  SMapData  mapData;      // block info (compressed)
  SArray*   pBlockList;   // block data index list
  SIterInfo iter;         // mem buffer skip list iterator
  SIterInfo iiter;        // imem buffer skip list iterator
  SArray*   delSkyline;   // delete info for this table
48 49
  int32_t   fileDelIndex; // file block delete index
  int32_t   lastBlockDelIndex;// delete index for last block
50 51
  bool      iterInit;     // whether to initialize the in-memory skip list iterator or not
  int16_t   indexInBlockL;// row position in last block
H
Haojun Liao 已提交
52 53 54
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
55
  int64_t uid;
56
  int64_t offset;
H
Haojun Liao 已提交
57
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
58 59

typedef struct SBlockOrderSupporter {
60 61 62 63
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
64 65 66
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
67 68 69
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
70
  int64_t headFileLoad;
71
  double  headFileLoadTime;
72
  int64_t smaDataLoad;
73
  double  smaLoadTime;
74 75
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Hongze Cheng 已提交
76 77 78
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
79
  SArray*          pColAgg;
80
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
81
  SColumnDataAgg** plist;
82 83
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
84 85
} SBlockLoadSuppInfo;

86 87 88 89 90 91 92 93 94 95 96
typedef struct SVersionRange {
  uint64_t minVer;
  uint64_t maxVer;
} SVersionRange;

typedef struct SLastBlockReader {
  SArray*       pBlockL;
  int32_t       currentBlockIndex;
  SBlockData    lastBlockData;
  STimeWindow   window;
  SVersionRange verRange;
97
  int32_t       order;
98
  uint64_t      uid;
99
  int16_t*      rowIndex;         // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
103 104 105 106
  int32_t           numOfFiles;    // number of total files
  int32_t           index;         // current accessed index in the list
  SArray*           pFileList;     // data file list
  int32_t           order;
107
  SLastBlockReader* pLastBlockReader; // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
119
  SArray*   blockList;      // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SBlock    block;          // current SBlock data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131 132
} SFileBlockDumpInfo;

typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
133 134
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
135
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
136
  SFileBlockDumpInfo   fBlockDumpInfo;
137 138 139 140 141
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
142 143
} SReaderStatus;

H
Hongze Cheng 已提交
144
struct STsdbReader {
H
Haojun Liao 已提交
145 146 147 148 149 150 151
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
152 153
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
154
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
155
  STsdbReadSnap*     pReadSnap;
156
  SIOCostSummary     cost;
157 158
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
159 160
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
161

162 163
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
164
};
H
Hongze Cheng 已提交
165

H
Haojun Liao 已提交
166
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
167 168
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
169
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
170 171
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Haojun Liao 已提交
172
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger);
173
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
174
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
175
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
176
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
177
                                     int32_t rowIndex);
178
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
179
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
180

dengyihao's avatar
dengyihao 已提交
181
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
182
                             STsdbReader* pReader, bool* freeTSRow);
183 184
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
185 186 187 188
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
189
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
190
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
191 192
static bool    hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
193

194 195 196
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

197
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
198

199
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
200
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
201 202 203
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
204 205
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
206

H
Haojun Liao 已提交
207 208
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
209
    pSupInfo->colIds[i] = pCol->info.colId;
210 211 212 213

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
214
  }
H
Hongze Cheng 已提交
215

H
Haojun Liao 已提交
216 217
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
218

219
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
220
  // allocate buffer in order to load data blocks from file
221
  // todo use simple hash instead, optimize the memory consumption
222 223 224
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
225 226 227
    return NULL;
  }

228
  for (int32_t j = 0; j < numOfTables; ++j) {
229
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
230 231 232
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
233 234
      }

235
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
236
    } else {
237
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
238
    }
wmmhello's avatar
wmmhello 已提交
239

240 241 242
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
243 244
  }

245 246
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
247

248
  return pTableMap;
H
Hongze Cheng 已提交
249
}
H
Hongze Cheng 已提交
250

251 252 253
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
254
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
255 256
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
257
    if (p->iter.iter != NULL) {
258
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
259 260
    }

261
    p->delSkyline = taosArrayDestroy(p->delSkyline);
262 263 264
  }
}

265 266 267 268 269 270 271 272
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
273
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
274 275 276
    }

    if (p->iiter.iter != NULL) {
277
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
278 279
    }

280 281
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
282
    tMapDataClear(&p->mapData);
283 284 285 286 287
  }

  taosHashCleanup(pTableMap);
}

288
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
289 290
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
291
}
H
Hongze Cheng 已提交
292

293 294 295
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
296
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
297

298
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
299
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
300

dengyihao's avatar
dengyihao 已提交
301
  STimeWindow win = *pWindow;
302 303 304 305 306 307
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
308

H
Haojun Liao 已提交
309
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
310 311 312 313 314 315
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
316 317 318
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
319 320 321 322
  }
}

// init file iterator
323
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) {
H
Hongze Cheng 已提交
324
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
325

326 327
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
328
  pIter->pFileList = aDFileSet;
329
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
330

331 332 333 334
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
335
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
336 337 338
      return code;
    }

339 340 341 342 343
    SLastBlockReader* pLReader = pIter->pLastBlockReader;
    pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
    pLReader->order   = pReader->order;
    pLReader->window  = pReader->window;
    pLReader->verRange = pReader->verRange;
344
    pLReader->currentBlockIndex = -1;
H
Haojun Liao 已提交
345 346 347 348 349

    int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
350 351
  }

352
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
353 354 355
  return TSDB_CODE_SUCCESS;
}

356
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
357 358
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
359 360 361
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
362 363 364 365 366
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
367

368
  while (1) {
H
Haojun Liao 已提交
369 370 371
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
372

373
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
374

375 376 377 378
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
379

380 381
    pReader->cost.headFileLoad += 1;

382 383 384 385 386 387 388 389 390 391 392 393
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
394 395 396
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
397 398
      continue;
    }
C
Cary Xu 已提交
399

400
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
401
              pReader->window.ekey, pReader->idStr);
402 403
    return true;
  }
404

405
_err:
H
Haojun Liao 已提交
406 407 408
  return false;
}

409
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
410 411
  pIter->order = order;
  pIter->index = -1;
412
  pIter->numOfBlocks = 0;
413 414 415 416 417
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
418
  pIter->pTableMap = pTableMap;
419 420
}

L
Liu Jicong 已提交
421
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
422

H
Haojun Liao 已提交
423
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
424 425
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
426 427
}

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

451 452
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
453
  int32_t      code = 0;
454
  int8_t       level = 0;
H
Haojun Liao 已提交
455
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
456 457
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
458
    goto _end;
H
Hongze Cheng 已提交
459 460
  }

C
Cary Xu 已提交
461 462 463 464
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
465
  initReaderStatus(&pReader->status);
466

L
Liu Jicong 已提交
467
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
468 469
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
470
  pReader->capacity = 4096;
dengyihao's avatar
dengyihao 已提交
471 472
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
473
  pReader->type = pCond->type;
474
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
475

476
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
477

478
  limitOutputBufferSize(pCond, &pReader->capacity);
479

480 481
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
482
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
483
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
484
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
485 486 487
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
488

489 490
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
491
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
492 493 494 495 496
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

497 498 499 500
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
501
  }
H
Hongze Cheng 已提交
502

503 504
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
505 506
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
507

H
Haojun Liao 已提交
508 509
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
510 511 512
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
546
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
570

H
Hongze Cheng 已提交
571
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573 574 575
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579 580 581 582 583 584 585 586 587 588 589
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
590

H
Hongze Cheng 已提交
591 592
//   return midSlot;
// }
H
Hongze Cheng 已提交
593

H
Haojun Liao 已提交
594
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
595
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
596

597
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
598
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
599
  if (code != TSDB_CODE_SUCCESS) {
600
    goto _end;
H
Haojun Liao 已提交
601
  }
H
Hongze Cheng 已提交
602

603 604
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
605
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
606 607
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
608

609 610 611 612
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
613
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
614

615
    // uid check
H
Hongze Cheng 已提交
616
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
617 618 619 620
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
621
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
622 623 624 625 626 627
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
628
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
629 630
    }

H
Hongze Cheng 已提交
631
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
632
  }
H
Hongze Cheng 已提交
633

634
  int64_t et2 = taosGetTimestampUs();
635
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
636
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
637 638 639

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

640
_end:
H
Hongze Cheng 已提交
641
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
642 643
  return code;
}
H
Hongze Cheng 已提交
644

645
static void cleanupTableScanInfo(SHashObj* pTableMap) {
646
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
647
  while (1) {
648
    px = taosHashIterate(pTableMap, px);
649 650 651 652
    if (px == NULL) {
      break;
    }

653 654
    // reset the index in last block when handing a new file
    px->indexInBlockL = -1;
655
    tMapDataClear(&px->mapData);
656 657
    taosArrayClear(px->pBlockList);
  }
658 659 660 661 662 663 664 665 666 667
}

static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
                               SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
668

dengyihao's avatar
dengyihao 已提交
669
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
670
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
671

672
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
673

674
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
675
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
676

677
    sizeInDisk += pScanInfo->mapData.nData;
678
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
679
      SBlock block = {0};
680
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
681

682
      // 1. time range check
683
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
684 685
        continue;
      }
H
Hongze Cheng 已提交
686

687
      // 2. version range check
H
Hongze Cheng 已提交
688
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
689 690
        continue;
      }
691

692
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
693
      if (p == NULL) {
694
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
695 696
        return TSDB_CODE_OUT_OF_MEMORY;
      }
697

698
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
699
    }
H
Hongze Cheng 已提交
700

H
Haojun Liao 已提交
701
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
702 703 704 705 706 707 708 709 710 711 712 713
      numOfQTable += 1;
    }
  }

  size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
  for(int32_t i = 0; i < numOfLast; ++i) {
    SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
    if (pLastBlock->suid != pReader->suid) {
      continue;
    }

    {
714 715 716 717
      // 1. time range check
      if (pLastBlock->minKey > pReader->window.ekey || pLastBlock->maxKey < pReader->window.skey) {
        continue;
      }
718 719 720 721 722 723 724

      // 2. version range check
      if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
        continue;
      }

      pBlockNum->numOfLastBlocks += 1;
725
      taosArrayPush(pQualifiedLastBlock, pLastBlock);
H
Haojun Liao 已提交
726 727
    }
  }
H
Hongze Cheng 已提交
728

729 730
  int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;

731
  double el = (taosGetTimestampUs() - st) / 1000.0;
732
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
733
            numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
734
            / 1000.0, el, pReader->idStr);
735

736
  pReader->cost.numOfBlocks += total;
737
  pReader->cost.headFileLoadTime += el;
738

H
Haojun Liao 已提交
739 740
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
741

742
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
743
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
744
  pDumpInfo->allDumped = true;
745
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
746 747
}

748 749
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
750
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
751
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
752 753 754
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
755
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
756 757 758 759
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
760
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
761
  }
H
Haojun Liao 已提交
762 763
}

764
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
765 766
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
767 768
    return NULL;
  }
769 770 771

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
772 773
}

774
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
775

776
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
777
  SReaderStatus*  pStatus = &pReader->status;
778
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
779

780
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
781
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
782
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
783
  SSDataBlock*        pResBlock = pReader->pResBlock;
784
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
785

H
Haojun Liao 已提交
786
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
787
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
788

H
Haojun Liao 已提交
789
  SColVal cv = {0};
790
  int64_t st = taosGetTimestampUs();
791 792
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
793

794
  int32_t rowIndex = 0;
795 796
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

797 798 799 800 801 802 803 804
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

805
  int32_t          i = 0;
806 807
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
808
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
809 810 811 812 813
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

814 815 816
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
817 818 819
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
820
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
821 822 823
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
824
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
825 826
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
827
      }
828
      colIndex += 1;
829
      i += 1;
830
      ASSERT(rowIndex == remain);
831 832
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
833
      i += 1;
H
Haojun Liao 已提交
834
    }
835 836
  }

837
  while (i < numOfOutputCols) {
838 839 840
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
841
  }
H
Haojun Liao 已提交
842

843
  pResBlock->info.rows = remain;
844
  pDumpInfo->rowIndex += step * remain;
845

846
  setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
H
Haojun Liao 已提交
847

848
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
849
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
850

851
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
852
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
853
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
854
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
855
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
856 857 858 859

  return TSDB_CODE_SUCCESS;
}

860
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
861
  int64_t st = taosGetTimestampUs();
862
  double  elapsedTime = 0;
863
  int32_t code = 0;
864

865
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
866
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
867

868
  if (pBlockInfo != NULL) {
869 870 871
    SBlock* pBlock = getCurrentBlock(pBlockIter);
    code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
    if (code != TSDB_CODE_SUCCESS) {
872 873 874 875
      tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                ", rows:%d, code:%s %s",
                pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
                tstrerror(code), pReader->idStr);
876 877
      goto _error;
    }
878

879
    elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
880

881 882
    tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
883
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
884 885
              pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
  } else {
886 887 888 889 890
#if 0
    SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

    uint64_t uid = pBlockInfo->uid;
    SArray*  pBlocks = pLastBlockReader->pBlockL;
891

892 893 894 895 896 897 898 899 900 901 902 903
    pLastBlockReader->currentBlockIndex = -1;

    // find the correct SBlockL
    for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
      SBlockL* pBlock = taosArrayGet(pBlocks, i);
      if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
        pLastBlockReader->currentBlockIndex = i;
        break;
      }
    }

//    SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
904
    code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
905 906 907 908 909 910 911 912 913 914 915 916 917
    if (code != TSDB_CODE_SUCCESS) {
      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
                pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
      goto _error;
    }

    tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
              pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
              pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
918 919 920 921
  }

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
922

H
Haojun Liao 已提交
923
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
924 925

_error:
H
Haojun Liao 已提交
926
  return code;
H
Haojun Liao 已提交
927
}
H
Hongze Cheng 已提交
928

H
Haojun Liao 已提交
929 930 931
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
932

H
Haojun Liao 已提交
933 934 935 936
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
937

H
Haojun Liao 已提交
938 939
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
940

H
Haojun Liao 已提交
941 942
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
943

H
Haojun Liao 已提交
944
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
945 946
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
947

H
Haojun Liao 已提交
948 949 950 951
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
952

H
Haojun Liao 已提交
953 954
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
955

H
Haojun Liao 已提交
956
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
957
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
958
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
959

H
Haojun Liao 已提交
960
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
961

H
Haojun Liao 已提交
962 963
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
964

H
Haojun Liao 已提交
965 966 967 968 969 970 971
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
972

973
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
974
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
975

976 977 978 979
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
980
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
981 982 983 984 985
  if (pFBlock != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
  }
986 987 988 989 990 991

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
992
}
H
Hongze Cheng 已提交
993

994
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
995
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
996

997
  pBlockIter->numOfBlocks = numOfBlocks;
998 999
  taosArrayClear(pBlockIter->blockList);

1000 1001
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1002

1003
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
1004

1005
  SBlockOrderSupporter sup = {0};
1006
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
1007 1008 1009
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1010

1011 1012 1013 1014 1015 1016 1017
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1018

1019 1020 1021 1022
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1023

1024 1025
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1026

1027 1028 1029 1030 1031
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1032

1033
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1034
    SBlock block = {0};
1035 1036
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1037 1038 1039 1040

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

1041
      wrapper.uid = pTableScanInfo->uid;
1042
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1043

1044 1045 1046 1047 1048 1049
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1050

1051
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1052

1053
  // since there is only one table qualified, blocks are not sorted
1054 1055
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1056 1057
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1058
    }
1059

1060
    int64_t et = taosGetTimestampUs();
1061
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1062
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1063

1064
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1065
    cleanupBlockOrderSupporter(&sup);
1066
    doSetCurrentBlock(pBlockIter);
1067
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1068
  }
H
Haojun Liao 已提交
1069

1070 1071
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1072

1073
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1074

1075 1076 1077 1078 1079
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1080
  }
H
Haojun Liao 已提交
1081

1082 1083 1084 1085
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1086

1087 1088
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1089

1090 1091 1092 1093
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1094

1095 1096
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1097
  }
H
Haojun Liao 已提交
1098

1099
  int64_t et = taosGetTimestampUs();
1100
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0,
1101
            pReader->idStr);
1102 1103
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1104

1105
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
1106 1107
  doSetCurrentBlock(pBlockIter);

1108
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1109
}
H
Hongze Cheng 已提交
1110

H
Haojun Liao 已提交
1111
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1112 1113
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1114
  int32_t step = asc ? 1 : -1;
1115
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1116 1117 1118
    return false;
  }

1119
  pBlockIter->index += step;
1120 1121
  doSetCurrentBlock(pBlockIter);

1122 1123 1124
  return true;
}

1125 1126 1127
/**
 * This is an two rectangles overlap cases.
 */
1128
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1129 1130
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1131 1132
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1133
}
H
Hongze Cheng 已提交
1134

1135 1136
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1137 1138 1139
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1140 1141
  }

1142
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1143 1144 1145
    return NULL;
  }

1146
  int32_t step = asc ? 1 : -1;
1147
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1148

1149
  SBlock*  pBlock = taosMemoryCalloc(1, sizeof(SBlock));
1150 1151 1152 1153
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1154 1155 1156 1157 1158
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1159
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1160 1161
  int32_t index = pBlockIter->index;

1162
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1175
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1176
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1177 1178 1179 1180
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1181 1182 1183 1184 1185
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1186

1187 1188 1189
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1190

1191
  doSetCurrentBlock(pBlockIter);
1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1202
}
H
Hongze Cheng 已提交
1203

1204
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1205
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1206

1207
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1208
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1209
}
H
Hongze Cheng 已提交
1210

H
Haojun Liao 已提交
1211
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1212 1213
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1214 1215
}

1216 1217 1218 1219 1220 1221
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1222
      if (p->version >= pBlock->minVer) {
1223 1224 1225
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1226
      if (p->version >= pBlock->minVer) {
1227 1228 1229 1230 1231 1232 1233
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1234
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1250
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1251 1252 1253 1254
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1255
  // ts is not overlap
1256
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1257
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1258 1259 1260 1261 1262
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1263 1264 1265 1266
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1267
    while (1) {
1268 1269 1270 1271 1272
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1273 1274 1275
      }
    }

1276 1277
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1278 1279
}

1280 1281 1282 1283
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1284
// 5. delete info should not overlap with current block data
1285
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
1286
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
1287 1288 1289
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1290
  // overlap with neighbor
1291 1292 1293
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1294
    taosMemoryFree(pNeighbor);
1295 1296
  }

1297
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1298 1299
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1300

1301 1302
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
  bool overlapWithlastBlock = false;
1303
  if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
1304 1305 1306 1307
    SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
    overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
  }

1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
  bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
  bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);

  bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
                        moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1318
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1319 1320 1321 1322 1323 1324
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
              pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
              moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1325 1326
}

1327
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1328
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1329 1330
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1331

1332 1333 1334
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1335
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1336

1337
  blockDataUpdateTsWindow(pBlock, 0);
1338
  pBlock->info.uid = pBlockScanInfo->uid;
1339

1340
  setComposedBlockFlag(pReader, true);
1341

1342
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1343
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1344 1345 1346
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1347 1348

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1349 1350 1351
  return code;
}

1352 1353
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1354 1355 1356 1357 1358
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1359
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1360 1361

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1362
    if (nextKey != key) {  // merge is not needed
1363
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1364 1365 1366 1367 1368 1369 1370 1371
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
1372 1373 1374 1375 1376 1377
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1378
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1397 1398 1399 1400 1401 1402 1403
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1404
  int64_t tsLast = INT64_MIN;
1405
  if ((pLastBlockReader->lastBlockData.nRow > 0) && hasDataInLastBlock(pLastBlockReader)) {
1406 1407
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1408 1409 1410 1411 1412 1413

  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;

1414 1415 1416 1417 1418 1419
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
    minKey = INT64_MAX;   // chosen the minimum value
    if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }
1420

1421 1422 1423
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1424

1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
    if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1441 1442 1443 1444
  }

  bool init = false;

1445 1446
  // ASC: file block ---> last block -----> imem -----> mem
  //DESC: mem -----> imem -----> last block -----> file block
1447 1448
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1449
      init = true;
1450 1451
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1452 1453
    }

1454
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1455 1456 1457 1458
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1459 1460 1461
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1462
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1463
    }
1464

1465
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1466 1467 1468
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1469 1470 1471 1472 1473 1474 1475 1476 1477
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1478 1479
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1480
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1481 1482
    }

1483
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1484 1485 1486 1487
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1488 1489 1490
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1491
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1492 1493 1494
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1495 1496 1497
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1498 1499 1500 1501 1502
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1513
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1514
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
1515
  SRowMerger          merge = {0};
H
Haojun Liao 已提交
1516
  STSRow*             pTSRow = NULL;
1517 1518 1519
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1520 1521 1522 1523
  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
  SArray*  pDelList = pBlockScanInfo->delSkyline;
  bool     freeTSRow = false;
H
Haojun Liao 已提交
1524
  uint64_t uid = pBlockScanInfo->uid;
1525

1526 1527 1528
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
1529 1530 1531 1532 1533 1534 1535
      // imem & mem are all empty, only file exist
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1536
        freeTSRow = true;
1537
      }
1538
    } else if (k.ts < key) {  // k.ts < key
1539
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1540 1541 1542
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1543 1544

      tRowMerge(&merge, pRow);
1545
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1546 1547

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1548
      freeTSRow = true;
1549
    }
1550 1551
  } else {  // descending order scan
    if (key < k.ts) {
1552
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1553
    } else if (k.ts < key) {
1554 1555 1556 1557 1558 1559
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1560
        freeTSRow = true;
1561
      }
1562
    } else {  // descending order: mem rows -----> imem rows ------> file block
H
Haojun Liao 已提交
1563
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1564

H
Haojun Liao 已提交
1565
      tRowMergerInit(&merge, pRow, pSchema);
1566
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1567 1568 1569 1570 1571

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1572
      freeTSRow = true;
1573
    }
1574 1575
  }

1576
  tRowMergerClear(&merge);
H
Haojun Liao 已提交
1577
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
H
Haojun Liao 已提交
1578

H
Haojun Liao 已提交
1579 1580 1581
  if (freeTSRow) {
    taosMemoryFree(pTSRow);
  }
H
Haojun Liao 已提交
1582

1583 1584 1585
  return TSDB_CODE_SUCCESS;
}

1586
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
  ASSERT(pRow != NULL && piRow != NULL);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
1598 1599 1600 1601
  int64_t tsLast = INT64_MIN;
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1602 1603 1604 1605 1606 1607

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1608 1609 1610 1611 1612 1613
  int64_t minKey = 0;//INT64_MAX;
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1614

1615 1616 1617
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1618

1619 1620 1621
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
1622

1623
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639
      minKey = tsLast;
    }
  } else {
    minKey = INT64_MIN; // let find the maximum ts value
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }

1640
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1641 1642
      minKey = tsLast;
    }
1643 1644 1645 1646
  }

  bool init = false;

1647 1648 1649 1650
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1651
      init = true;
1652 1653 1654
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1655 1656
    }

1657
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1658 1659 1660 1661
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1662 1663 1664
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1665
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1666 1667 1668
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1669 1670 1671
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1672 1673 1674 1675 1676
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1677 1678
    }

1679
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1680 1681 1682
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1683 1684 1685 1686 1687 1688 1689 1690
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1691 1692
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1693 1694 1695 1696
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1697 1698 1699
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1700 1701 1702 1703 1704 1705 1706 1707
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == tsLast) {
H
Haojun Liao 已提交
1708 1709 1710 1711
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1712 1713 1714
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1715
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1716 1717 1718
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1719
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1720 1721
      if (!init) {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1722 1723
      } else {
        tRowMerge(&merge, &fRow);
1724 1725
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736
    }
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1737
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1738 1739 1740
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1741
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1742
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1743

1744 1745
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1746
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1747

1748
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1749
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1750

1751
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1752

1753 1754 1755
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1756 1757
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1758 1759 1760
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1761
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1762

1763 1764
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1765
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1766 1767
      }

1768 1769
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1770
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1771 1772 1773
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1774
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1775
      return TSDB_CODE_SUCCESS;
1776
    } else {  // key > ik.ts || key > k.ts
1777 1778
      ASSERT(key != ik.ts);

1779
      // [3] ik.ts < key <= k.ts
1780
      // [4] ik.ts < k.ts <= key
1781
      if (ik.ts < k.ts) {
1782
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1783
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1784 1785 1786
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1787 1788 1789
        return TSDB_CODE_SUCCESS;
      }

1790 1791
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1792
      if (k.ts < ik.ts) {
1793
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1794
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1795 1796 1797
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1798 1799 1800
        return TSDB_CODE_SUCCESS;
      }

1801
      // [7] k.ts == ik.ts < key
1802
      if (k.ts == ik.ts) {
1803 1804
        ASSERT(key > ik.ts && key > k.ts);

1805
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1806
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1807
        taosMemoryFree(pTSRow);
1808 1809 1810
        return TSDB_CODE_SUCCESS;
      }
    }
1811 1812 1813
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1814
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1815

H
Haojun Liao 已提交
1816
      tRowMergerInit(&merge, pRow, pSchema);
1817
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1818 1819 1820

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1821
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1822 1823 1824 1825 1826 1827 1828 1829 1830
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1831
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1832 1833
      return TSDB_CODE_SUCCESS;
    } else {
1834
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1835 1836 1837 1838

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1839
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1840
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1841 1842 1843
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1855
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1856
        taosMemoryFree(pTSRow);
1857 1858 1859 1860 1861
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1862
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1863 1864 1865 1866 1867

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1868
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1869 1870

        taosMemoryFree(pTSRow);
1871 1872 1873 1874 1875 1876
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1877
  return -1;
1878 1879
}

dengyihao's avatar
dengyihao 已提交
1880 1881
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1882 1883 1884 1885 1886 1887 1888 1889
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1901
  TSDBKEY k = {.ts = ts, .version = ver};
1902
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1903 1904 1905
    return false;
  }

1906 1907 1908
  return true;
}

1909
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1910

1911
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) {
1912 1913
  pLastBlockReader->uid = uid;
  pLastBlockReader->rowIndex = startPos;
1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925

  if (*startPos == -1) {
    if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
      // do nothing
    } else {
      *startPos = pLastBlockReader->lastBlockData.nRow;
    }
  }
}

static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
  *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
1926 1927
}

H
Haojun Liao 已提交
1928
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
1929 1930
  int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1931 1932 1933
    return false;
  }

1934
  *(pLastBlockReader->rowIndex) += step;
1935 1936

  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1937
  for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
H
Haojun Liao 已提交
1938
    if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) {
1939 1940 1941
      continue;
    }

H
Haojun Liao 已提交
1942 1943
    int64_t ts = pBlockData->aTSKEY[i];
    if (ts < pLastBlockReader->window.skey) {
1944 1945 1946
      continue;
    }

H
Haojun Liao 已提交
1947 1948
    int64_t ver = pBlockData->aVersion[i];
    if (ver < pLastBlockReader->verRange.minVer) {
1949 1950 1951
      continue;
    }

H
Haojun Liao 已提交
1952
    // no data any more, todo opt handle desc case
H
Haojun Liao 已提交
1953
    if (ts > pLastBlockReader->window.ekey) {
H
Haojun Liao 已提交
1954
      continue;
1955 1956
    }

H
Haojun Liao 已提交
1957
    // todo opt handle desc case
H
Haojun Liao 已提交
1958 1959 1960 1961 1962
    if (ver > pLastBlockReader->verRange.maxVer) {
      continue;
    }

    TSDBKEY k = {.ts = ts, .version = ver};
1963
    if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
H
Haojun Liao 已提交
1964
      continue;
1965 1966
    }

1967
    *(pLastBlockReader->rowIndex) = i;
1968 1969 1970
    return true;
  }

1971
  // set all data is consumed in last block
1972
  setAllRowsChecked(pLastBlockReader);
1973 1974 1975 1976 1977
  return false;
}

static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1978
  return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
1979 1980 1981
}

static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
1982
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1983 1984 1985 1986 1987
    return false;
  }
  return true;
}

1988
// todo refactor
H
Haojun Liao 已提交
1989 1990
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1991 1992
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1993
  int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
1994 1995
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1996

1997
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1998
    return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1999
  } else {
2000
    // imem + file + last block
2001
    if (pBlockScanInfo->iiter.hasVal) {
2002
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2003 2004
    }

2005
    // mem + file
2006
    if (pBlockScanInfo->iter.hasVal) {
2007
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2008
    }
2009

2010
    if (pBlockData->nRow > 0) {
2011
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2012

2013
      // no last block available, only data block exists
2014
      if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031
        if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
          return TSDB_CODE_SUCCESS;
        } else {
          STSRow*    pTSRow = NULL;
          SRowMerger merge = {0};

          tRowMergerInit(&merge, &fRow, pReader->pSchema);
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
          tRowMergerGetRow(&merge, &pTSRow);
          doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

          taosMemoryFree(pTSRow);
          tRowMergerClear(&merge);
          return TSDB_CODE_SUCCESS;
        }
      }

2032 2033
      // row in last file block
      int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
2034
      ASSERT(ts >= key);
2035

2036 2037 2038 2039 2040 2041 2042 2043
      if (ASCENDING_TRAVERSE(pReader->order)) {
        if (key < ts) {
          // imem & mem are all empty, only file exist
          if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
            return TSDB_CODE_SUCCESS;
          } else {
            STSRow*    pTSRow = NULL;
            SRowMerger merge = {0};
2044

2045 2046 2047 2048
            tRowMergerInit(&merge, &fRow, pReader->pSchema);
            doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
            tRowMergerGetRow(&merge, &pTSRow);
            doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
2049

2050 2051 2052 2053 2054
            taosMemoryFree(pTSRow);
            tRowMergerClear(&merge);
            return TSDB_CODE_SUCCESS;
          }
        } else if (key == ts) {
2055 2056 2057 2058 2059
          STSRow*    pTSRow = NULL;
          SRowMerger merge = {0};

          tRowMergerInit(&merge, &fRow, pReader->pSchema);
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2060 2061
          doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);

2062 2063 2064 2065 2066 2067
          tRowMergerGetRow(&merge, &pTSRow);
          doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

          taosMemoryFree(pTSRow);
          tRowMergerClear(&merge);
          return TSDB_CODE_SUCCESS;
2068 2069 2070
        } else {
          ASSERT(0);
          return TSDB_CODE_SUCCESS;
2071
        }
2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090
      } else {  // desc order
        SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
        TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);

        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);

        if (ts == key) {
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        }

        tRowMergerGetRow(&merge, &pTSRow);
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
        return TSDB_CODE_SUCCESS;
2091 2092 2093 2094 2095
      }
    } else {  // only last block exists
      SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
      int64_t     tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);

2096 2097
      STSRow*    pTSRow = NULL;
      SRowMerger merge = {0};
H
Haojun Liao 已提交
2098

2099
      TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
2100

2101
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2102
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
2103
      tRowMergerGetRow(&merge, &pTSRow);
2104

H
Haojun Liao 已提交
2105
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
2106

2107 2108 2109 2110
      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
      return TSDB_CODE_SUCCESS;
    }
2111 2112 2113
  }
}

2114
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
2115 2116
  SSDataBlock* pResBlock = pReader->pResBlock;

2117
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2118 2119 2120 2121 2122 2123 2124 2125

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2126
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2127
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2128 2129
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2130

2131 2132
  int64_t st = taosGetTimestampUs();

2133
  while (1) {
2134 2135
    // todo check the validate of row in file block
    {
2136 2137
      bool hasBlockData = false;

H
Haojun Liao 已提交
2138
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2139 2140 2141 2142 2143
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2144 2145
        pDumpInfo->rowIndex += step;

2146
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2147
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2148
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2149 2150 2151
          break;
        }
      }
2152

2153 2154 2155 2156
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if ((hasBlockData == false) && (hasBlockLData == false)) {
2157 2158
        break;
      }
2159 2160
    }

2161
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2162

2163
    // currently loaded file data block is consumed
2164
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
2165
      SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2166
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2167 2168 2169 2170 2171
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2172 2173 2174 2175
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2176 2177
  blockDataUpdateTsWindow(pResBlock, 0);

2178
  setComposedBlockFlag(pReader, true);
2179
  int64_t et = taosGetTimestampUs();
2180

H
Haojun Liao 已提交
2181
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s",
2182
            pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
2183
            pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
2184

2185 2186 2187 2188 2189
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2190
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2191 2192 2193 2194
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

2195
  int32_t code = TSDB_CODE_SUCCESS;
2196 2197 2198 2199 2200 2201 2202 2203 2204

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
2205 2206

  STbData* d = NULL;
H
Hongze Cheng 已提交
2207
  if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
2208
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
2209
    if (d != NULL) {
2210
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
2211
      if (code == TSDB_CODE_SUCCESS) {
2212
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
2213

H
Haojun Liao 已提交
2214
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2215 2216
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
2217
      } else {
2218 2219
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2220
        return code;
2221 2222
      }
    }
H
Haojun Liao 已提交
2223
  } else {
2224
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2225 2226
  }

2227
  STbData* di = NULL;
H
Hongze Cheng 已提交
2228
  if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
2229
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
2230
    if (di != NULL) {
2231
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
2232
      if (code == TSDB_CODE_SUCCESS) {
2233
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
2234

H
Haojun Liao 已提交
2235
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2236
                  "-%" PRId64 " %s",
2237
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
2238
      } else {
2239 2240
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2241
        return code;
2242 2243
      }
    }
H
Haojun Liao 已提交
2244 2245
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2246 2247
  }

2248 2249
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

2250
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
2251 2252 2253
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2254 2255
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2256 2257 2258
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2259

2260 2261 2262
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2263 2264
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2265
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2266 2267
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2268
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2269
    if (code != TSDB_CODE_SUCCESS) {
2270 2271 2272 2273 2274
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2275
      tsdbDelFReaderClose(&pDelFReader);
2276 2277 2278
      goto _err;
    }

H
Hongze Cheng 已提交
2279
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2280 2281 2282
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2283 2284
      goto _err;
    }
2285

2286 2287 2288
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2289
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2290
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2291 2292 2293 2294 2295 2296 2297
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2298
    }
2299
  }
2300

2301 2302 2303 2304 2305 2306 2307
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2308 2309
  }

2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2324 2325
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2326 2327
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2328
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2329 2330
  return code;

2331 2332 2333
_err:
  taosArrayDestroy(pDelData);
  return code;
2334 2335
}

2336
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2337 2338
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

2339 2340
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2341
  if (pRow != NULL) {
2342 2343 2344
    key = TSDBROW_KEY(pRow);
  }

2345
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2346
  if (pRow != NULL) {
2347 2348 2349 2350 2351 2352 2353 2354 2355
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2356
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2357
  SReaderStatus* pStatus = &pReader->status;
2358 2359
  pBlockNum->numOfBlocks = 0;
  pBlockNum->numOfLastBlocks = 0;
2360

2361
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2362
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
2363
  SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
2364
  taosArrayClear(pLastBlocks);
H
Haojun Liao 已提交
2365 2366

  while (1) {
2367
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2368
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2369 2370 2371
      break;
    }

H
Haojun Liao 已提交
2372
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2373 2374
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2375
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2376 2377 2378
      return code;
    }

2379
    code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
2380 2381 2382 2383 2384
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(pIndexList);
      return code;
    }

2385 2386 2387 2388
    if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
      SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));

      code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
H
Haojun Liao 已提交
2389
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2390
        taosArrayDestroy(pIndexList);
2391
        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2392 2393 2394
        return code;
      }

2395 2396
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
        ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
2397 2398 2399 2400
        taosArrayClear(pLastBlocks);
        taosArrayAddAll(pLastBlocks, pQLastBlock);

        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2401 2402
        break;
      }
H
Haojun Liao 已提交
2403 2404

      taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2405
    }
2406

H
Haojun Liao 已提交
2407 2408 2409
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2410
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2411 2412 2413
  return TSDB_CODE_SUCCESS;
}

2414
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) {
2415 2416 2417
  SArray*  pBlocks = pLastBlockReader->pBlockL;
  SBlockL* pBlock = NULL;

2418
  uint64_t uid = pBlockScanInfo->uid;
2419 2420
  int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks);

2421
  initMemDataIterator(pBlockScanInfo, pReader);
2422

2423 2424 2425
  // find the correct SBlockL. todo binary search
  int32_t index = -1;
  for (int32_t i = 0; i < totalLastBlocks; ++i) {
2426 2427
    SBlockL* p = taosArrayGet(pBlocks, i);
    if (p->minUid <= uid && p->maxUid >= uid) {
2428
      index = i;
2429 2430 2431 2432 2433
      pBlock = p;
      break;
    }
  }

2434 2435
  if (index == -1) {
    pLastBlockReader->currentBlockIndex = index;
H
Haojun Liao 已提交
2436
    tBlockDataReset(&pLastBlockReader->lastBlockData);
2437 2438 2439
    return TSDB_CODE_SUCCESS;
  }

2440 2441 2442 2443 2444
  // the required last datablock has already loaded
  if (index == pLastBlockReader->currentBlockIndex) {
    return TSDB_CODE_SUCCESS;
  }

2445
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
2446
  int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
2447
  if (code != TSDB_CODE_SUCCESS) {
2448
    tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
2449 2450 2451 2452
    return code;
  }

  code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
2453 2454

  double el = (taosGetTimestampUs() - st) / 1000.0;
2455
  if (code != TSDB_CODE_SUCCESS) {
2456 2457 2458 2459
    tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
              pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
  } else {
    tsdbDebug("%p load last block completed, uid:%" PRIu64
2460
              " last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 "-%" PRId64
2461
              " elapsed time:%.2f ms, %s",
2462 2463
              pReader, uid, index, totalLastBlocks, pBlock->nRow, pBlock->minVer, pBlock->maxVer, pBlock->minKey,
              pBlock->maxKey, el, pReader->idStr);
2464 2465
  }

2466
  pLastBlockReader->currentBlockIndex = index;
2467 2468 2469
  pReader->cost.lastBlockLoad += 1;
  pReader->cost.lastBlockLoadTime += el;

2470 2471 2472 2473
  return TSDB_CODE_SUCCESS;
}

static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2474
  SReaderStatus* pStatus = &pReader->status;
2475 2476 2477 2478 2479 2480 2481 2482
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

  while(1) {
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
2483 2484 2485 2486 2487
    }

    // load the last data block of current table
    // todo opt perf by avoiding load last block repeatly
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
2488
    int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
2489
    if (code != TSDB_CODE_SUCCESS) {
2490 2491 2492
      return code;
    }

2493 2494
    if (pLastBlockReader->currentBlockIndex != -1) {
      initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
2495 2496
      int32_t index = pScanInfo->indexInBlockL;
      if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
H
Haojun Liao 已提交
2497
        bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
2498 2499 2500 2501 2502 2503
        if (!hasData) {  // current table does not have rows in last block, try next table
          pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
          if (pStatus->pTableIter == NULL) {
            return TSDB_CODE_SUCCESS;
          }
          continue;
2504
        }
2505
      }
2506 2507 2508 2509 2510 2511
    } else {  // no data in last block, try next table
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
      continue;
2512 2513
    }

2514 2515 2516 2517 2518 2519 2520 2521
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2522

2523 2524 2525 2526 2527
    // current table is exhausted, let's try next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
      return TSDB_CODE_SUCCESS;
    }
2528 2529 2530
  }
}

2531
static int32_t doBuildDataBlock(STsdbReader* pReader) {
2532
  TSDBKEY key = {0};
2533
  int32_t code = TSDB_CODE_SUCCESS;
2534
  SBlock* pBlock = NULL;
2535 2536 2537

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2538 2539 2540
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2541

2542
  if (pBlockInfo != NULL) {
2543 2544 2545 2546 2547 2548
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

  if (pBlockInfo != NULL) {
2549
    pBlock = getCurrentBlock(pBlockIter);
2550 2551 2552 2553
  }

  {
    key = getCurrentKeyInBuf(pScanInfo, pReader);
2554

2555
    // load the last data block of current table
2556
    code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
2557
    if (code != TSDB_CODE_SUCCESS) {
2558
      return code;
2559 2560
    }

H
Haojun Liao 已提交
2561
    // note: the lastblock may be null here
2562 2563
    initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
    if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
H
Haojun Liao 已提交
2564
      bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
2565
    }
2566
  }
2567

2568 2569 2570 2571 2572 2573 2574
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2575
      return code;
2576
    }
2577

2578 2579 2580
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2581 2582 2583
    }

    // build composed data block
2584
    code = buildComposedDataBlock(pReader);
2585 2586
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2587
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2588
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2589
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
      ASSERT (tsLast >= pBlock->maxKey.ts);
      tBlockDataReset(&pReader->status.fileBlockData);

      code = buildComposedDataBlock(pReader);
    } else {   // whole block is required, return it directly
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2606 2607 2608 2609 2610
  }

  return code;
}

H
Haojun Liao 已提交
2611
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2612 2613
  SReaderStatus* pStatus = &pReader->status;

2614
  while (1) {
2615 2616 2617
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2618
        return TSDB_CODE_SUCCESS;
2619 2620 2621 2622
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2623
    initMemDataIterator(pBlockScanInfo, pReader);
2624

2625
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2626
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2627 2628 2629 2630
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2631
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2632
      return TSDB_CODE_SUCCESS;
2633 2634 2635 2636 2637
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2638
      return TSDB_CODE_SUCCESS;
2639 2640 2641 2642
    }
  }
}

2643
// set the correct start position in case of the first/last file block, according to the query time window
2644
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2645
  SBlock* pBlock = getCurrentBlock(pBlockIter);
2646

2647 2648 2649
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2650 2651 2652

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2653
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2654 2655
}

2656
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2657 2658
  SBlockNumber num = {0};

2659
  int32_t code = moveToNextFile(pReader, &num);
2660 2661 2662 2663 2664
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2665
  if (num.numOfBlocks + num.numOfLastBlocks == 0) {
2666 2667 2668 2669 2670
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2671 2672
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
2673
  } else { // no block data, only last block exists
2674 2675
    tBlockDataReset(&pReader->status.fileBlockData);
    resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
2676
  }
2677

2678 2679 2680
  SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
  pLReader->currentBlockIndex = -1;

2681
  // set the correct start position according to the query time window
2682
  initBlockDumpInfo(pReader, pBlockIter);
2683 2684 2685
  return code;
}

2686
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2687 2688
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2689 2690
}

2691
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2692
  int32_t code = TSDB_CODE_SUCCESS;
2693 2694
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2695 2696
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2697 2698 2699 2700 2701 2702 2703
  if (pBlockIter->numOfBlocks == 0) {
    _begin:
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2704 2705 2706 2707
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2708
    // all data blocks are checked in this last block file, now let's try the next file
2709 2710 2711 2712 2713 2714 2715 2716
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2717
      // this file does not have data files, let's start check the last block file if exists
2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2733
  while (1) {
2734 2735
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2736
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2737
      code = buildComposedDataBlock(pReader);
2738 2739 2740 2741 2742 2743 2744
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2745 2746
        } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {  // data blocks in current file are exhausted, let's try the next file now
          tBlockDataReset(&pReader->status.fileBlockData);
2747
          resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
2748 2749
          goto _begin;
        } else {
2750 2751 2752 2753 2754 2755
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2756 2757 2758 2759 2760

          // this file does not have blocks, let's start check the last block file
          if (pBlockIter->numOfBlocks == 0) {
            goto _begin;
          }
2761
        }
H
Haojun Liao 已提交
2762
      }
2763 2764

      code = doBuildDataBlock(pReader);
2765 2766
    }

2767 2768 2769 2770 2771 2772 2773 2774
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2775
}
H
refact  
Hongze Cheng 已提交
2776

2777 2778
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2779
  if (VND_IS_RSMA(pVnode)) {
2780
    int8_t  level = 0;
2781 2782
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2783
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2797
    const char* str = (idStr != NULL) ? idStr : "";
2798 2799

    if (level == TSDB_RETENTION_L0) {
2800
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2801
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2802 2803
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2804
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2805
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2806 2807
      return VND_RSMA1(pVnode);
    } else {
2808
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2809
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2810 2811 2812 2813 2814 2815 2816
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2817
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2818
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2819 2820

  int64_t endVer = 0;
L
Liu Jicong 已提交
2821 2822
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2823 2824
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2825
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2826 2827
  }

H
Haojun Liao 已提交
2828
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2829 2830
}

2831
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2832 2833 2834 2835
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2836 2837 2838
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2839

2840 2841 2842 2843 2844 2845
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2846
        return false;
2847 2848 2849
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2850 2851
      }
    } else {
2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2882 2883
    }
  } else {
2884 2885
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2886

2887 2888 2889 2890 2891 2892 2893
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2894
    } else {
2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2922 2923 2924 2925 2926
      }

      return false;
    }
  }
2927 2928

  return false;
2929 2930 2931 2932
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2933 2934
    return NULL;
  }
H
Hongze Cheng 已提交
2935

2936
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2937
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2938
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2939
    pIter->hasVal = false;
H
Haojun Liao 已提交
2940 2941
    return NULL;
  }
H
Hongze Cheng 已提交
2942

2943
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2944
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2945
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2946 2947
    return pRow;
  }
H
Hongze Cheng 已提交
2948

2949
  while (1) {
2950 2951
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2952 2953
      return NULL;
    }
H
Hongze Cheng 已提交
2954

2955
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2956

H
Haojun Liao 已提交
2957
    key = TSDBROW_KEY(pRow);
2958
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2959
      pIter->hasVal = false;
H
Haojun Liao 已提交
2960 2961
      return NULL;
    }
H
Hongze Cheng 已提交
2962

dengyihao's avatar
dengyihao 已提交
2963
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2964
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2965 2966 2967 2968
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2969

2970 2971
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2972
  while (1) {
2973 2974
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2975 2976
      break;
    }
H
Hongze Cheng 已提交
2977

2978
    // data exists but not valid
2979
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2980 2981 2982 2983 2984
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2985
    TSDBKEY k = TSDBROW_KEY(pRow);
2986
    if (k.ts != ts) {
H
Haojun Liao 已提交
2987 2988 2989
      break;
    }

H
Haojun Liao 已提交
2990
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2991
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2992 2993 2994 2995 2996
  }

  return TSDB_CODE_SUCCESS;
}

2997
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2998
                                          SVersionRange* pVerRange, int32_t step) {
2999 3000
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3001
      rowIndex += step;
3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
3019 3020
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3021
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3022
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3023

3024
  *state = CHECK_FILEBLOCK_QUIT;
3025
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3026 3027 3028

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
3029
  if (pNeighborBlock == NULL) {  // do nothing
3030 3031 3032 3033
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
3034 3035
  taosMemoryFree(pNeighborBlock);

3036
  if (overlap) {  // load next block
3037
    SReaderStatus*  pStatus = &pReader->status;
3038 3039
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3040
    // 1. find the next neighbor block in the scan block list
3041
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3042
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3043

3044
    // 2. remove it from the scan block list
3045
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3046

3047
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
3048
    tBlockDataReset(&pStatus->fileBlockData);
3049 3050 3051 3052 3053 3054
    int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
3055 3056 3057 3058
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3059
    // 4. check the data values
3060 3061 3062 3063
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3064
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3065 3066 3067 3068 3069 3070 3071
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3072 3073
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3074 3075
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3076
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3077
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3078
  int32_t step = asc ? 1 : -1;
3079

3080
  pDumpInfo->rowIndex += step;
3081
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3082 3083 3084
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3085

3086 3087 3088 3089
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3090

3091
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
3092
      SBlock*             pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3093 3094 3095
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3096
      }
3097
    }
H
Haojun Liao 已提交
3098
  }
3099

H
Haojun Liao 已提交
3100 3101 3102
  return TSDB_CODE_SUCCESS;
}

3103
// todo check if the rows are dropped or not
H
Haojun Liao 已提交
3104 3105
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) {
  while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) {
3106 3107
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3108
      TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);
3109 3110 3111 3112 3113 3114 3115 3116 3117
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3118
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
3119
                      STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3120
  TSDBROW* pNextRow = NULL;
3121
  TSDBROW  current = *pRow;
3122

3123 3124
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3125

3126 3127 3128 3129 3130
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
      return;
    } else {  // has next point in mem/imem
H
Haojun Liao 已提交
3131
      pNextRow = getValidRow(pIter, pDelList, pReader);
3132 3133 3134 3135 3136 3137
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }

H
Haojun Liao 已提交
3138
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3139 3140 3141 3142
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }
3143
    }
3144 3145
  }

3146 3147
  SRowMerger merge = {0};

3148
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3149
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3150

3151 3152
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3153
  }
H
Haojun Liao 已提交
3154

H
Haojun Liao 已提交
3155 3156 3157 3158 3159 3160
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
3161
  tRowMergerGetRow(&merge, pTSRow);
3162
  tRowMergerClear(&merge);
M
Minglei Jin 已提交
3163

3164
  *freeTSRow = true;
3165 3166
}

3167 3168
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
3169 3170
  SRowMerger merge = {0};

3171 3172 3173
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3174
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3175
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3176

H
Haojun Liao 已提交
3177
    tRowMergerInit(&merge, piRow, pSchema);
3178
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3179

3180
    tRowMerge(&merge, pRow);
3181
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3182
  } else {
H
Haojun Liao 已提交
3183
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3184

H
Haojun Liao 已提交
3185
    tRowMergerInit(&merge, pRow, pSchema);
3186
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3187 3188

    tRowMerge(&merge, piRow);
3189
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3190
  }
3191 3192 3193 3194

  tRowMergerGetRow(&merge, pTSRow);
}

3195 3196
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3197 3198
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3199
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
3200

3201 3202
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3203
  if (pBlockScanInfo->iter.hasVal) {
3204 3205 3206 3207 3208 3209
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3210
  if (pBlockScanInfo->iiter.hasVal) {
3211 3212 3213 3214 3215 3216
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3217
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3218
    TSDBKEY k = TSDBROW_KEY(pRow);
3219
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3220

3221
    if (ik.ts < k.ts) {  // ik.ts < k.ts
3222
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3223
    } else if (k.ts < ik.ts) {
3224
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3225 3226
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
3227
      *freeTSRow = true;
H
Haojun Liao 已提交
3228
    }
3229 3230

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3231 3232
  }

3233
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3234
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3235 3236 3237
    return TSDB_CODE_SUCCESS;
  }

3238
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3239
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3240 3241 3242 3243 3244 3245
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3246
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3247 3248 3249
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3250
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3251
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3252

3253
  SColVal colVal = {0};
3254
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3255

3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3267
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3268 3269 3270 3271 3272 3273 3274 3275
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3276
    }
3277 3278
  }

3279
  // set null value since current column does not exist in the "pSchema"
3280
  while (i < numOfCols) {
3281 3282 3283 3284 3285
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3286 3287 3288 3289
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3290
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
3291 3292 3293 3294 3295 3296 3297 3298
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3299
    i += 1;
3300 3301 3302 3303 3304 3305
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

3306
  while (i < numOfOutputCols && j < numOfInputCols) {
3307
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3308
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3309 3310

    if (pData->cid == pCol->info.colId) {
3311 3312
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3313 3314 3315 3316 3317 3318 3319 3320 3321 3322
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3323
    colDataAppendNULL(pCol, outputRowIndex);
3324 3325 3326 3327 3328 3329 3330
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3331 3332
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3333 3334 3335 3336
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3337
    bool    freeTSRow = false;
3338
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3339 3340
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3341 3342
    }

H
Haojun Liao 已提交
3343
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3344 3345 3346
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3347 3348

    // no data in buffer, return immediately
3349
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3350 3351 3352
      break;
    }

3353
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3354 3355 3356 3357
      break;
    }
  } while (1);

3358
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3359 3360
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3361

3362
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3363
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3364 3365 3366
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

3367
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
3368
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3369 3370 3371
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3372 3373 3374 3375 3376 3377
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3378

dengyihao's avatar
dengyihao 已提交
3379 3380 3381 3382 3383 3384
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3385

H
Hongze Cheng 已提交
3386
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3387

C
Cary Xu 已提交
3388

H
refact  
Hongze Cheng 已提交
3389
// ====================================== EXPOSED APIs ======================================
3390 3391
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3392 3393
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3394 3395
    goto _err;
  }
H
Hongze Cheng 已提交
3396

3397
  // check for query time window
H
Haojun Liao 已提交
3398
  STsdbReader* pReader = *ppReader;
3399
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3400 3401 3402
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3403

3404 3405 3406
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3407
    int32_t     order = pCond->order;
3408 3409 3410 3411 3412 3413 3414 3415 3416 3417
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3418
    // here we only need one more row, so the capacity is set to be ONE.
3419 3420 3421 3422 3423 3424 3425 3426
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3427
    } else {
3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

3444 3445
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3446 3447 3448
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3449

H
Haojun Liao 已提交
3450 3451 3452
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3453

H
Hongze Cheng 已提交
3454
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3455 3456 3457
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3458

3459 3460
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3461

3462
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3463
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3464 3465 3466 3467 3468 3469 3470 3471 3472 3473

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3474
  } else {
3475
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3476 3477
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3478 3479 3480 3481 3482
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3483
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3484
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
3485 3486 3487 3488 3489 3490 3491 3492 3493

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3494 3495 3496
    }
  }

3497
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3498
  return code;
H
Hongze Cheng 已提交
3499 3500

_err:
S
Shengliang Guan 已提交
3501
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3502
  return code;
H
refact  
Hongze Cheng 已提交
3503 3504 3505
}

void tsdbReaderClose(STsdbReader* pReader) {
3506 3507
  if (pReader == NULL) {
    return;
3508
  }
H
refact  
Hongze Cheng 已提交
3509

3510
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3511
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
3512

3513 3514 3515 3516
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3517
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3518 3519 3520 3521 3522
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3523
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3524 3525

  cleanupDataBlockIterator(&pReader->status.blockIter);
3526 3527

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3528
  destroyBlockScanInfo(pReader->status.pTableMap);
3529
  blockDataDestroy(pReader->pResBlock);
3530

H
Haojun Liao 已提交
3531 3532 3533
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3534

H
Haojun Liao 已提交
3535 3536 3537 3538 3539 3540 3541
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
    tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
    taosArrayDestroy(pFilesetIter->pLastBlockReader->pBlockL);
    taosMemoryFree(pFilesetIter->pLastBlockReader);
  }

3542
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
3543

3544
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3545 3546
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3547 3548 3549 3550 3551
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3552

3553 3554
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3555 3556 3557
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3558
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3559 3560
}

3561
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3562
  // cleanup the data that belongs to the previous data block
3563 3564
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3565

3566
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3567

3568 3569 3570 3571 3572
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3573

3574 3575 3576
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3577
      buildBlockFromBufferSequentially(pReader);
3578
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3579
    }
3580 3581 3582
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3583
  }
3584

3585
  return false;
H
refact  
Hongze Cheng 已提交
3586 3587
}

3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3625 3626 3627 3628
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3629 3630
}

3631 3632
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3633
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3634
      setBlockInfo(pReader, pDataBlockInfo);
3635
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3636 3637 3638 3639 3640 3641 3642 3643 3644
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3645
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3646
  int32_t code = 0;
3647
  *allHave = false;
H
Hongze Cheng 已提交
3648

3649
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3650 3651 3652 3653
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3654
  // there is no statistics data for composed block
3655 3656 3657 3658
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3659

3660
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3661

3662
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
3663
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3664

3665 3666
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

3667
  if (tBlockHasSma(pBlock)) {
H
Hongze Cheng 已提交
3668
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3669
    if (code != TSDB_CODE_SUCCESS) {
3670 3671
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3672 3673
      return code;
    }
3674 3675 3676
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3677
  }
H
Hongze Cheng 已提交
3678

3679
  *allHave = true;
H
Hongze Cheng 已提交
3680

3681 3682
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3683

3684 3685
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3702 3703
      i += 1;
      j += 1;
3704 3705 3706 3707 3708 3709 3710
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3711
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3712
  pReader->cost.smaLoadTime += elapsed;
3713
  pReader->cost.smaDataLoad += 1;
3714 3715 3716

  *pBlockStatis = pSup->plist;

3717
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3718 3719
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3720
  return code;
H
Hongze Cheng 已提交
3721 3722
}

3723
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3724 3725 3726
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3727
    return pReader->pResBlock->pDataBlock;
3728
  }
3729

3730
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3731
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3732

H
Haojun Liao 已提交
3733
  tBlockDataReset(&pStatus->fileBlockData);
3734 3735
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3736 3737
    terrno = code;
    return NULL;
3738 3739 3740
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3741
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3742
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3743 3744
    terrno = code;
    return NULL;
3745
  }
3746 3747 3748

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3749 3750
}

3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3763
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3764 3765 3766
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3767

L
Liu Jicong 已提交
3768
  pReader->order = pCond->order;
3769
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3770
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3771
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3772
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3773

3774
  // allocate buffer in order to load data blocks from file
3775
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3776 3777
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3778
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3779
  tsdbDataFReaderClose(&pReader->pFileReader);
3780

3781
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3782 3783
  tsdbDataFReaderClose(&pReader->pFileReader);

3784
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3785
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3786
  resetDataBlockScanInfo(pReader->status.pTableMap);
3787

3788
  int32_t         code = 0;
3789 3790
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3791 3792 3793 3794 3795 3796
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3797 3798
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3799 3800 3801
      return code;
    }
  }
H
Hongze Cheng 已提交
3802

dengyihao's avatar
dengyihao 已提交
3803 3804
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3805

3806
  return code;
H
Hongze Cheng 已提交
3807
}
H
Hongze Cheng 已提交
3808

3809 3810 3811
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3812

3813 3814 3815 3816
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3817

3818 3819
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3820

3821 3822 3823
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3824

3825
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3826

3827
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3828

3829 3830
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3831

3832 3833
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3834

3835 3836
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3837
  }
H
Hongze Cheng 已提交
3838

3839
  pTableBlockInfo->numOfTables = numOfTables;
3840
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3841

3842 3843
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
3844
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3845

3846 3847
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3848

3849 3850 3851
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3852

3853 3854 3855
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3856

3857 3858 3859
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3860

3861 3862
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3863 3864

      hasNext = blockIteratorNext(&pStatus->blockIter);
3865 3866 3867 3868 3869
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3870

3871 3872
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3873
    }
H
refact  
Hongze Cheng 已提交
3874

H
Hongze Cheng 已提交
3875 3876
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3877
  }
H
Hongze Cheng 已提交
3878

H
refact  
Hongze Cheng 已提交
3879 3880
  return code;
}
H
Hongze Cheng 已提交
3881

H
refact  
Hongze Cheng 已提交
3882
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3883
  int64_t rows = 0;
H
Hongze Cheng 已提交
3884

3885 3886
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3887

3888 3889 3890 3891 3892
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3893
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3894 3895 3896 3897 3898 3899 3900
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3901
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3902 3903 3904 3905 3906 3907 3908 3909
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3910

H
refact  
Hongze Cheng 已提交
3911
  return rows;
H
Hongze Cheng 已提交
3912
}
D
dapan1121 已提交
3913

L
Liu Jicong 已提交
3914
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3927

D
dapan1121 已提交
3928
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3929
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3945

D
dapan1121 已提交
3946 3947
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3978
  // fs
H
Hongze Cheng 已提交
3979 3980 3981 3982 3983
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3984 3985 3986 3987 3988 3989 3990 3991

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3992
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4007
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4008
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4009
  }
H
Hongze Cheng 已提交
4010

S
Shengliang Guan 已提交
4011
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4012
}