tsdbRead.c 127.7 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o)  (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Hongze Cheng 已提交
73 74 75
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
76
  SArray*          pColAgg;
77
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
78
  SColumnDataAgg** plist;
79 80
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
81 82
} SBlockLoadSuppInfo;

83 84 85
typedef struct SLastBlockReader {
  STimeWindow   window;
  SVersionRange verRange;
86
  int32_t       order;
87
  uint64_t      uid;
88
  SMergeTree    mergeTree;
89
  SSttBlockLoadInfo* pInfo;
90 91
} SLastBlockReader;

92
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
93 94 95
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
96
  int32_t           order;
H
Hongze Cheng 已提交
97
  SLastBlockReader* pLastBlockReader;  // last file block reader
98
} SFilesetIter;
H
Haojun Liao 已提交
99 100

typedef struct SFileDataBlockInfo {
101
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
102
  uint64_t uid;
103
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
104 105 106
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
107
  int32_t   numOfBlocks;
108
  int32_t   index;
H
Hongze Cheng 已提交
109
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
110
  int32_t   order;
H
Hongze Cheng 已提交
111
  SDataBlk  block;  // current SDataBlk data
112
  SHashObj* pTableMap;
H
Haojun Liao 已提交
113 114 115
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
116 117 118 119
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
120 121
} SFileBlockDumpInfo;

122
typedef struct SUidOrderCheckInfo {
123 124
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
125 126
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
127
typedef struct SReaderStatus {
128
  bool                 loadFromFile;       // check file stage
129
  bool                 composedDataBlock;  // the returned data block is a composed block or not
130 131
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
132
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
133
  SFileBlockDumpInfo   fBlockDumpInfo;
134 135 136 137
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
138 139
} SReaderStatus;

H
Hongze Cheng 已提交
140
struct STsdbReader {
H
Haojun Liao 已提交
141 142 143 144 145 146 147
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
148 149
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
150
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
151
  STsdbReadSnap*     pReadSnap;
152
  SIOCostSummary     cost;
153 154
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
155 156
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
157

158 159
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
160
};
H
Hongze Cheng 已提交
161

H
Haojun Liao 已提交
162
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
163 164
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
165
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
166 167
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
168 169
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                                       SRowMerger* pMerger);
170
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
171
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
172
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
173
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
174
                                         int32_t rowIndex);
175
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
176
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
177

H
Hongze Cheng 已提交
178 179 180 181
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
182 183
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
184

dengyihao's avatar
dengyihao 已提交
185 186 187 188
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
189
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
190 191 192
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
193

194 195 196
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

197
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
198

199
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
200
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
201 202 203
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
204 205
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
206

H
Haojun Liao 已提交
207 208
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
209
    pSupInfo->colIds[i] = pCol->info.colId;
210 211 212 213

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
214
  }
H
Hongze Cheng 已提交
215

H
Haojun Liao 已提交
216 217
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
218

219
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
220
  // allocate buffer in order to load data blocks from file
221
  // todo use simple hash instead, optimize the memory consumption
222 223 224
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
225 226 227
    return NULL;
  }

228
  for (int32_t j = 0; j < numOfTables; ++j) {
229
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
230
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
231 232
      int64_t skey = pTsdbReader->window.skey;
      info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
wmmhello's avatar
wmmhello 已提交
233
    } else {
H
Haojun Liao 已提交
234 235
      int64_t ekey = pTsdbReader->window.ekey;
      info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
H
Haojun Liao 已提交
236
    }
wmmhello's avatar
wmmhello 已提交
237

238 239 240
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
241 242
  }

243 244
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
245

246
  return pTableMap;
H
Hongze Cheng 已提交
247
}
H
Hongze Cheng 已提交
248

H
Haojun Liao 已提交
249
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
250 251
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
252
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
253 254
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
255
    if (p->iter.iter != NULL) {
256
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
257 258
    }

259
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
260
    p->lastKey = ts;
261 262 263
  }
}

264 265 266 267 268 269 270 271
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
272
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
273 274 275
    }

    if (p->iiter.iter != NULL) {
276
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
277 278
    }

279 280
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
281
    tMapDataClear(&p->mapData);
282 283 284 285 286
  }

  taosHashCleanup(pTableMap);
}

287
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
288 289
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
290
}
H
Hongze Cheng 已提交
291

292 293 294
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
295
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
296

297
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
298
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
299

dengyihao's avatar
dengyihao 已提交
300
  STimeWindow win = *pWindow;
301 302 303 304 305 306
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
307

H
Haojun Liao 已提交
308
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
309 310 311 312 313 314
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
315 316 317
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
318 319 320 321
  }
}

// init file iterator
322
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
323
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
324

325 326
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
327
  pIter->pFileList = aDFileSet;
328
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
329

330 331 332 333
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
334
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
335 336
      return code;
    }
337 338
  }

339 340 341 342 343 344 345 346
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

347
  if (pLReader->pInfo == NULL) {
H
Haojun Liao 已提交
348 349 350 351 352
    pLReader->pInfo = tCreateLastBlockLoadInfo();
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
353 354
  }

355
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
356 357 358
  return TSDB_CODE_SUCCESS;
}

359
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
360 361
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
362 363 364
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
365 366 367
    return false;
  }

H
Haojun Liao 已提交
368 369 370
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

371 372
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
373
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
374

H
Haojun Liao 已提交
375 376
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
377

378
  while (1) {
H
Haojun Liao 已提交
379 380 381
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
382

383
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
384

385 386 387 388
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
389

390 391
    pReader->cost.headFileLoad += 1;

392 393 394 395 396 397 398 399 400 401 402 403
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
404 405 406
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
407 408
      continue;
    }
C
Cary Xu 已提交
409

410
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
411
              pReader->window.ekey, pReader->idStr);
412 413
    return true;
  }
414

415
_err:
H
Haojun Liao 已提交
416 417 418
  return false;
}

419
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
420 421
  pIter->order = order;
  pIter->index = -1;
422
  pIter->numOfBlocks = 0;
423 424 425 426 427 428 429
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
430
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
431

H
Haojun Liao 已提交
432
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
433 434
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
435 436
}

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

460 461
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
462
  int32_t      code = 0;
463
  int8_t       level = 0;
H
Haojun Liao 已提交
464
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
465 466
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
467
    goto _end;
H
Hongze Cheng 已提交
468 469
  }

C
Cary Xu 已提交
470 471 472 473
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
474
  initReaderStatus(&pReader->status);
475

L
Liu Jicong 已提交
476
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
477 478
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
479
  pReader->capacity = 4096;
dengyihao's avatar
dengyihao 已提交
480 481
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
482
  pReader->type = pCond->type;
483
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
484

485
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
486

487
  limitOutputBufferSize(pCond, &pReader->capacity);
488

489 490
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
491
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
492
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
493
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
494 495 496
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
497

498 499
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
500
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
501 502 503 504 505
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

506 507 508 509
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
510
  }
H
Hongze Cheng 已提交
511

512 513
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
514 515
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
516

H
Haojun Liao 已提交
517 518
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
519 520 521
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
522

H
Haojun Liao 已提交
523
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
524
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
525

526
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
527
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
528
  if (code != TSDB_CODE_SUCCESS) {
529
    goto _end;
H
Haojun Liao 已提交
530
  }
H
Hongze Cheng 已提交
531

532 533
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
534
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
535 536
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
537

538 539 540 541
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
542
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
543

544
    // uid check
H
Hongze Cheng 已提交
545
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
546 547 548 549
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
550
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
551 552 553 554 555 556
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
557
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
558 559
    }

H
Hongze Cheng 已提交
560
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
561
  }
H
Hongze Cheng 已提交
562

563
  int64_t et2 = taosGetTimestampUs();
564
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
565
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
566 567 568

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

569
_end:
H
Hongze Cheng 已提交
570
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
571 572
  return code;
}
H
Hongze Cheng 已提交
573

574
static void cleanupTableScanInfo(SHashObj* pTableMap) {
575
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
576
  while (1) {
577
    px = taosHashIterate(pTableMap, px);
578 579 580 581
    if (px == NULL) {
      break;
    }

582
    // reset the index in last block when handing a new file
583
    tMapDataClear(&px->mapData);
584 585
    taosArrayClear(px->pBlockList);
  }
586 587
}

588
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
589 590 591 592 593 594
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
595

dengyihao's avatar
dengyihao 已提交
596
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
597
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
598

599
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
600

601
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
602
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
603

604
    sizeInDisk += pScanInfo->mapData.nData;
605
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
606 607
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
608

609
      // 1. time range check
610
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
611 612
        continue;
      }
H
Hongze Cheng 已提交
613

614
      // 2. version range check
H
Hongze Cheng 已提交
615
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
616 617
        continue;
      }
618

619
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
620
      if (p == NULL) {
621
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
622 623
        return TSDB_CODE_OUT_OF_MEMORY;
      }
624

625
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
626
    }
H
Hongze Cheng 已提交
627

H
Haojun Liao 已提交
628
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
629 630 631 632
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
633
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
634
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
635

636
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
637
  tsdbDebug(
638
      "load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
639
      "time:%.2f ms %s",
640
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
641
      pReader->idStr);
642

643
  pReader->cost.numOfBlocks += total;
644
  pReader->cost.headFileLoadTime += el;
645

H
Haojun Liao 已提交
646 647
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
648

649
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
650
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
651
  pDumpInfo->allDumped = true;
652
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
653 654
}

655 656
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
657
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
658
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
659 660 661
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
662
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
663 664 665 666
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
667
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
668
  }
H
Haojun Liao 已提交
669 670
}

671
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
672 673
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
674 675
    return NULL;
  }
676 677 678

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
679 680
}

H
Hongze Cheng 已提交
681
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
682

683
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
684
  SReaderStatus*  pStatus = &pReader->status;
685
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
686

687
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
688
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
689
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
690
  SSDataBlock*        pResBlock = pReader->pResBlock;
691
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
692

H
Haojun Liao 已提交
693
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
694
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
695

H
Haojun Liao 已提交
696
  SColVal cv = {0};
697
  int64_t st = taosGetTimestampUs();
698 699
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
700

701
  int32_t rowIndex = 0;
702 703
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

704 705 706 707 708 709 710 711
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

712
  int32_t          i = 0;
713 714
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
715
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
716 717 718 719 720
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

721 722 723
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
724 725 726
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
727
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
728 729 730
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
731
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
732 733
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
734
      }
735
      colIndex += 1;
736
      i += 1;
737
      ASSERT(rowIndex == remain);
738 739
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
740
      i += 1;
H
Haojun Liao 已提交
741
    }
742 743
  }

744
  while (i < numOfOutputCols) {
745 746 747
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
748
  }
H
Haojun Liao 已提交
749

750
  pResBlock->info.rows = remain;
751
  pDumpInfo->rowIndex += step * remain;
752

753
  setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
H
Haojun Liao 已提交
754

755
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
756
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
757

758
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
759
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
760
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
761
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
762
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
763 764 765 766

  return TSDB_CODE_SUCCESS;
}

767
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
768 769
  int64_t st = taosGetTimestampUs();

770
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
771
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
772
  ASSERT(pBlockInfo != NULL);
773

H
Hongze Cheng 已提交
774 775
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
  int32_t   code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
776 777 778
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
779
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
780 781 782
              tstrerror(code), pReader->idStr);
    return code;
  }
783

784
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
785

786 787 788 789
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
790 791 792

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
793

H
Haojun Liao 已提交
794
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
795
}
H
Hongze Cheng 已提交
796

H
Haojun Liao 已提交
797 798 799
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
800

H
Haojun Liao 已提交
801 802 803 804
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
805

H
Haojun Liao 已提交
806 807
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
808

H
Haojun Liao 已提交
809 810
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
811

H
Haojun Liao 已提交
812
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
813 814
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
815

H
Haojun Liao 已提交
816 817 818 819
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
820

H
Haojun Liao 已提交
821 822
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
823

H
Haojun Liao 已提交
824
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
825
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
826
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
827

H
Haojun Liao 已提交
828
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
829

H
Haojun Liao 已提交
830 831
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
832

H
Haojun Liao 已提交
833 834 835 836 837 838 839
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
840

841
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
842
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
843

844 845 846 847
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
848 849 850
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
851
    int32_t*             mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
852
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
853
  }
854 855 856 857 858 859

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
860
}
H
Hongze Cheng 已提交
861

862
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
863
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
864

865
  pBlockIter->numOfBlocks = numOfBlocks;
866
  taosArrayClear(pBlockIter->blockList);
867
  pBlockIter->pTableMap = pReader->status.pTableMap;
868

869 870
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
871

872
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
873

874
  SBlockOrderSupporter sup = {0};
875
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
876 877 878
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
879

880 881 882 883 884 885 886
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
887

888 889 890 891
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
892

893 894
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
895

896 897 898 899 900
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
901

902
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
903
    SDataBlk block = {0};
904 905
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
906 907

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
908
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
909

910
      wrapper.uid = pTableScanInfo->uid;
911
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
912

913 914 915 916 917 918
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
919

920
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
921

922
  // since there is only one table qualified, blocks are not sorted
923 924
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
925 926
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
927
    }
928

929
    int64_t et = taosGetTimestampUs();
930
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
931
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
932

933
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
934
    cleanupBlockOrderSupporter(&sup);
935
    doSetCurrentBlock(pBlockIter);
936
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
937
  }
H
Haojun Liao 已提交
938

939 940
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
941

942
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
943

944 945 946 947 948
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
949
  }
H
Haojun Liao 已提交
950

951 952 953 954
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
955

956 957
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
958

959 960 961 962
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
963

964 965
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
966
  }
H
Haojun Liao 已提交
967

968
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
969 970
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
971 972
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
973

974
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
975 976
  doSetCurrentBlock(pBlockIter);

977
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
978
}
H
Hongze Cheng 已提交
979

H
Haojun Liao 已提交
980
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
981 982
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

983
  int32_t step = asc ? 1 : -1;
984
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
985 986 987
    return false;
  }

988
  pBlockIter->index += step;
989 990
  doSetCurrentBlock(pBlockIter);

991 992 993
  return true;
}

994 995 996
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
997
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
998 999
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1000 1001
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1002
}
H
Hongze Cheng 已提交
1003

H
Hongze Cheng 已提交
1004 1005
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                             int32_t* nextIndex, int32_t order) {
1006 1007 1008
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1009 1010
  }

1011
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1012 1013 1014
    return NULL;
  }

1015
  int32_t step = asc ? 1 : -1;
1016
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1017

H
Hongze Cheng 已提交
1018 1019
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1020

H
Hongze Cheng 已提交
1021
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1022
  return pBlock;
1023 1024 1025 1026 1027
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1028
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1029 1030
  int32_t index = pBlockIter->index;

1031
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1044
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1045
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1046 1047 1048 1049
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1050 1051 1052 1053 1054
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1055

1056 1057 1058
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1059

1060
  doSetCurrentBlock(pBlockIter);
1061 1062 1063
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1064
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1065 1066 1067 1068 1069 1070
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1071
}
H
Hongze Cheng 已提交
1072

H
Hongze Cheng 已提交
1073
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1074
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1075

1076
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1077
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1078
}
H
Hongze Cheng 已提交
1079

H
Hongze Cheng 已提交
1080
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1081 1082
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1083 1084
}

H
Hongze Cheng 已提交
1085
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1086 1087 1088 1089 1090
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1091
      if (p->version >= pBlock->minVer) {
1092 1093 1094
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1095
      if (p->version >= pBlock->minVer) {
1096 1097 1098 1099 1100 1101 1102
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1103
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1119
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1120 1121 1122 1123
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1124
  // ts is not overlap
1125
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1126
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1127 1128 1129 1130 1131
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1132 1133 1134 1135
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1136
    while (1) {
1137 1138 1139 1140 1141
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1142 1143 1144
      }
    }

1145 1146
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1147 1148
}

1149 1150 1151 1152
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1153
// 5. delete info should not overlap with current block data
H
Hongze Cheng 已提交
1154
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock,
1155
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
H
Hongze Cheng 已提交
1156 1157
  int32_t   neighborIndex = 0;
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
1158

1159
  // overlap with neighbor
1160 1161 1162
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1163
    taosMemoryFree(pNeighbor);
1164 1165
  }

1166
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1167 1168
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1169

1170
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
1171
  // todo
1172
  bool overlapWithlastBlock = false;
1173
#if 0
H
Hongze Cheng 已提交
1174
  if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
H
Hongze Cheng 已提交
1175
    SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
H
Hongze Cheng 已提交
1176
    overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
1177
  }
1178
#endif
1179

1180 1181 1182 1183 1184 1185 1186 1187 1188 1189
  bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
  bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);

  bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
                        moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1190
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1191 1192 1193 1194 1195 1196
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
              pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
              moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1197 1198
}

1199
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1200
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1201 1202
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1203

1204 1205 1206
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1207
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1208

1209
  blockDataUpdateTsWindow(pBlock, 0);
1210
  pBlock->info.uid = pBlockScanInfo->uid;
1211

1212
  setComposedBlockFlag(pReader, true);
1213

1214
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1215
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1216 1217 1218
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1219 1220

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1221 1222 1223
  return code;
}

1224 1225
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1226 1227 1228 1229 1230
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1231
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1232 1233

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1234
    if (nextKey != key) {  // merge is not needed
1235
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1236 1237 1238 1239 1240 1241 1242 1243
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo);
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1276 1277 1278 1279 1280 1281
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1282
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1301
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1302 1303 1304 1305 1306 1307
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1308
  int64_t tsLast = INT64_MIN;
1309
  if (hasDataInLastBlock(pLastBlockReader)) {
1310 1311
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1312

H
Hongze Cheng 已提交
1313 1314
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1315

1316 1317
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1318
    minKey = INT64_MAX;  // chosen the minimum value
1319
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1320 1321
      minKey = tsLast;
    }
1322

1323 1324 1325
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1326

1327 1328 1329 1330 1331
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1332
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1343 1344 1345 1346
  }

  bool init = false;

1347
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1348
  // DESC: mem -----> imem -----> last block -----> file block
1349 1350
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1351
      init = true;
1352 1353
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1354 1355
    }

1356
    if (minKey == tsLast) {
1357
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1358 1359 1360
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1361 1362 1363
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1364
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1365
    }
1366

1367
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1368 1369 1370
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1371 1372 1373 1374 1375 1376 1377 1378 1379
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1380 1381
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1382
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1383 1384
    }

1385
    if (minKey == tsLast) {
1386
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1387 1388 1389
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1390 1391 1392
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1393
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1394 1395 1396
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1397 1398 1399
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1400 1401 1402 1403 1404
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1405 1406
  }

1407 1408 1409 1410 1411
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1412 1413 1414 1415 1416 1417 1418
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1419 1420 1421
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1422
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1423
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1424 1425 1426

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1427
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
1428

1429 1430 1431 1432 1433 1434
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1435 1436 1437

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1438
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
1439

1440 1441 1442 1443
      int32_t code = tRowMergerGetRow(&merge, &pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1444

1445 1446 1447 1448 1449 1450 1451 1452
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
H
Haojun Liao 已提交
1453
    ASSERT(mergeBlockData);
1454 1455

    // merge with block data if ts == key
H
Haojun Liao 已提交
1456
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

    int32_t code = tRowMergerGetRow(&merge, &pTSRow);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1470 1471 1472 1473

  return TSDB_CODE_SUCCESS;
}

1474 1475
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1476 1477
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1478 1479
  if (pBlockData->nRow > 0) {
    // no last block available, only data block exists
1480
    if (!hasDataInLastBlock(pLastBlockReader)) {
1481 1482 1483 1484 1485 1486 1487 1488 1489
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1490
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1491 1492 1493 1494
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1495

1496 1497
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1498 1499 1500 1501

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1502
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
1503

1504 1505 1506 1507 1508
        int32_t code = tRowMergerGetRow(&merge, &pTSRow);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1509
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1510

1511 1512
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1513
        return code;
1514
      } else {
1515 1516
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1517
      }
1518
    } else {  // desc order
1519
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1520
    }
1521
  } else {  // only last block exists
1522
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1523
  }
1524 1525
}

1526 1527
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
1528 1529 1530 1531 1532 1533
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1534 1535
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1536 1537
  ASSERT(pRow != NULL && piRow != NULL);

1538
  int64_t tsLast = INT64_MIN;
1539 1540 1541
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1542 1543 1544 1545 1546 1547

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1548
  int64_t minKey = 0;
1549 1550 1551 1552 1553
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1554

1555 1556 1557
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1558

1559 1560 1561
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
1562

1563
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1564 1565 1566
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1567
    minKey = INT64_MIN;  // let find the maximum ts value
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }

1580
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1581 1582
      minKey = tsLast;
    }
1583 1584 1585 1586
  }

  bool init = false;

1587 1588 1589 1590
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1591
      init = true;
1592 1593 1594
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1595 1596
    }

1597
    if (minKey == tsLast) {
1598
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1599 1600 1601
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1602 1603 1604
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1605
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1606 1607 1608
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1609 1610 1611
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1612 1613 1614 1615 1616
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1617 1618
    }

1619
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1620 1621 1622
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1623 1624 1625 1626 1627 1628 1629 1630
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1631 1632
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1633 1634 1635 1636
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1637 1638 1639
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1640 1641 1642 1643 1644 1645 1646 1647
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == tsLast) {
1648
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1649 1650 1651
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1652 1653 1654
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1655
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1656 1657 1658
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1659
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1660 1661
      if (!init) {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1662 1663
      } else {
        tRowMerge(&merge, &fRow);
1664 1665
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1666 1667 1668
    }
  }

1669 1670 1671 1672 1673
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1674 1675 1676 1677
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
1678
  return code;
1679 1680
}

1681
#if 0
1682
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1683 1684 1685
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1686
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1687
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1688

1689 1690
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1691
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1692

1693
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1694
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1695

1696
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1697

1698 1699 1700
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1701 1702
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1703 1704 1705
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1706
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1707

1708 1709
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1710
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1711 1712
      }

1713 1714
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1715
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1716 1717 1718
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1719
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1720
      return TSDB_CODE_SUCCESS;
1721
    } else {  // key > ik.ts || key > k.ts
1722 1723
      ASSERT(key != ik.ts);

1724
      // [3] ik.ts < key <= k.ts
1725
      // [4] ik.ts < k.ts <= key
1726
      if (ik.ts < k.ts) {
1727
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1728
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1729 1730 1731
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1732 1733 1734
        return TSDB_CODE_SUCCESS;
      }

1735 1736
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1737
      if (k.ts < ik.ts) {
1738
        doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1739
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1740 1741 1742
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1743 1744 1745
        return TSDB_CODE_SUCCESS;
      }

1746
      // [7] k.ts == ik.ts < key
1747
      if (k.ts == ik.ts) {
1748 1749
        ASSERT(key > ik.ts && key > k.ts);

1750
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1751
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1752
        taosMemoryFree(pTSRow);
1753 1754 1755
        return TSDB_CODE_SUCCESS;
      }
    }
1756 1757 1758
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1759
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1760

H
Haojun Liao 已提交
1761
      tRowMergerInit(&merge, pRow, pSchema);
1762
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1763 1764 1765

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1766
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1767 1768 1769 1770 1771 1772 1773 1774 1775
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1776
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1777 1778
      return TSDB_CODE_SUCCESS;
    } else {
1779
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1780 1781 1782 1783

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1784
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1785
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1786 1787 1788
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1800
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1801
        taosMemoryFree(pTSRow);
1802 1803 1804 1805 1806
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1807
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1808 1809 1810 1811 1812

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1813
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1814 1815

        taosMemoryFree(pTSRow);
1816 1817 1818 1819 1820 1821
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1822
  return -1;
1823
}
1824
#endif
1825

1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1851
                  "-%" PRId64 " %s",
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1872
                  "-%" PRId64 " %s",
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1890 1891
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1892 1893 1894 1895 1896 1897 1898 1899
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1911
  TSDBKEY k = {.ts = ts, .version = ver};
1912
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1913 1914 1915
    return false;
  }

1916 1917 1918
  return true;
}

1919
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1920

1921
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
1922
  // the last block reader has been initialized for this table.
1923
  if (pLBlockReader->uid == pScanInfo->uid) {
1924 1925 1926
    return true;
  }

1927 1928
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
1929 1930
  }

1931 1932
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
1933

1934 1935 1936 1937
  int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
1938
  } else {
1939
    w.ekey = pScanInfo->lastKey + step;
1940 1941
  }

1942
  int32_t code =
1943
      tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
H
Haojun Liao 已提交
1944
                     pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr);
1945 1946 1947 1948
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

1949
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
1950 1951
}

1952
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
1953
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
1954
  return TSDBROW_TS(&row);
1955 1956
}

H
Hongze Cheng 已提交
1957
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
1958

1959 1960
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1973 1974 1975 1976 1977
    int32_t code = tRowMergerGetRow(&merge, &pTSRow);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1978 1979 1980 1981 1982 1983 1984 1985
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
1986 1987
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1988 1989
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

H
Hongze Cheng 已提交
1990
  int64_t  key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1991 1992
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1993

1994
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1995
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1996
  } else {
1997
    // imem + file + last block
1998
    if (pBlockScanInfo->iiter.hasVal) {
1999
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2000 2001
    }

2002
    // mem + file + last block
2003
    if (pBlockScanInfo->iter.hasVal) {
2004
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2005
    }
2006

2007 2008
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2009 2010 2011
  }
}

2012
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
2013 2014
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2015
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2016 2017 2018 2019 2020 2021 2022 2023

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2024
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2025
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2026 2027
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2028

2029 2030
  int64_t st = taosGetTimestampUs();

2031
  while (1) {
2032
    // todo check the validate of row in file block
2033
    bool hasBlockData = false;
2034
    {
H
Haojun Liao 已提交
2035
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2036 2037 2038 2039 2040
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2041 2042
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2043
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2044
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2045
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2046 2047 2048
          break;
        }
      }
2049
    }
2050

2051
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2052

2053 2054 2055
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2056 2057
    }

2058
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2059

2060
    // currently loaded file data block is consumed
2061
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2062
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2063
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2064 2065 2066 2067 2068
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2069 2070 2071 2072
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2073 2074
  blockDataUpdateTsWindow(pResBlock, 0);

2075
  setComposedBlockFlag(pReader, true);
2076
  int64_t et = taosGetTimestampUs();
2077

2078 2079 2080 2081 2082 2083
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
              pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
              pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
  }
2084

2085 2086 2087 2088 2089
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2090 2091
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2092 2093 2094
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2095

2096 2097 2098
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2099 2100
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2101
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2102 2103
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2104
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2105
    if (code != TSDB_CODE_SUCCESS) {
2106 2107 2108 2109 2110
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2111
      tsdbDelFReaderClose(&pDelFReader);
2112 2113 2114
      goto _err;
    }

H
Hongze Cheng 已提交
2115
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2116 2117 2118
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2119 2120
      goto _err;
    }
2121

2122 2123 2124
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2125
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2126
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2127 2128 2129 2130 2131 2132 2133
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2134
    }
2135
  }
2136

2137 2138 2139 2140 2141 2142 2143
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2144 2145
  }

2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2160 2161
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2162 2163
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2164
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2165 2166
  return code;

2167 2168 2169
_err:
  taosArrayDestroy(pDelData);
  return code;
2170 2171
}

2172
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2173
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2174
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2175
  if (pRow != NULL) {
2176 2177 2178
    key = TSDBROW_KEY(pRow);
  }

2179
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2180
  if (pRow != NULL) {
2181 2182 2183 2184 2185 2186 2187 2188 2189
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2190
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2191
  SReaderStatus* pStatus = &pReader->status;
2192
  pBlockNum->numOfBlocks = 0;
2193
  pBlockNum->numOfLastFiles = 0;
2194

2195
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2196
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2197 2198

  while (1) {
2199
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2200
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2201 2202 2203
      break;
    }

H
Haojun Liao 已提交
2204
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2205 2206
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2207
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2208 2209 2210
      return code;
    }

H
Hongze Cheng 已提交
2211
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2212
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2213
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2214
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2215 2216 2217
        return code;
      }

2218
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2219 2220 2221
        break;
      }
    }
2222

H
Haojun Liao 已提交
2223 2224 2225
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2226
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2227 2228 2229
  return TSDB_CODE_SUCCESS;
}

2230
static int32_t uidComparFunc(const void* p1, const void* p2) {
2231 2232
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2233 2234 2235
  if (pu1 == pu2) {
    return 0;
  } else {
2236
    return (pu1 < pu2) ? -1 : 1;
2237 2238
  }
}
2239

2240
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2241 2242 2243 2244
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2245
  while (p != NULL) {
2246 2247 2248 2249 2250 2251 2252 2253
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2254
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2255 2256 2257 2258
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2259

2260
  if (pOrderCheckInfo->tableUidList == NULL) {
2261 2262 2263 2264 2265 2266
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2267
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2268 2269 2270
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2271 2272
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2273 2274
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2275 2276

      // the tableMap has already updated
2277
      if (pStatus->pTableIter == NULL) {
2278
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2279 2280 2281 2282 2283 2284 2285 2286 2287
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2288
      }
2289
    }
2290
  }
2291

2292 2293 2294
  return TSDB_CODE_SUCCESS;
}

2295
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2308
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2309
  SReaderStatus*    pStatus = &pReader->status;
2310 2311
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2312 2313
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2314
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2315 2316
    return code;
  }
2317

2318
  while (1) {
2319 2320
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2321
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2322
    if (!hasVal) {
2323 2324
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2325 2326 2327
        return TSDB_CODE_SUCCESS;
      }
      continue;
2328 2329
    }

2330 2331 2332 2333 2334 2335 2336 2337
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2338

2339
    // current table is exhausted, let's try next table
2340 2341
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2342 2343
      return TSDB_CODE_SUCCESS;
    }
2344 2345 2346
  }
}

2347
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2348 2349
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2350 2351 2352

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2353 2354 2355
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2356

2357
  if (pBlockInfo != NULL) {
2358 2359 2360 2361 2362 2363
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

  if (pBlockInfo != NULL) {
2364
    pBlock = getCurrentBlock(pBlockIter);
2365 2366
  }

2367
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2368
  TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
2369

2370 2371 2372 2373 2374 2375 2376
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2377
      return code;
2378
    }
2379

2380 2381 2382
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2383 2384 2385
    }

    // build composed data block
2386
    code = buildComposedDataBlock(pReader);
2387 2388
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2389
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2390
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2391
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2392 2393 2394 2395
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2396
      ASSERT(tsLast >= pBlock->maxKey.ts);
2397 2398 2399
      tBlockDataReset(&pReader->status.fileBlockData);

      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2400
    } else {  // whole block is required, return it directly
2401 2402 2403 2404 2405 2406 2407
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2408 2409 2410 2411 2412
  }

  return code;
}

H
Haojun Liao 已提交
2413
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2414 2415
  SReaderStatus* pStatus = &pReader->status;

2416
  while (1) {
2417 2418 2419
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2420
        return TSDB_CODE_SUCCESS;
2421 2422 2423 2424
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2425
    initMemDataIterator(pBlockScanInfo, pReader);
2426

2427
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2428
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2429 2430 2431 2432
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2433
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2434
      return TSDB_CODE_SUCCESS;
2435 2436 2437 2438 2439
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2440
      return TSDB_CODE_SUCCESS;
2441 2442 2443 2444
    }
  }
}

2445
// set the correct start position in case of the first/last file block, according to the query time window
2446
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2447
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2448

2449 2450 2451
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2452 2453 2454

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2455
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2456 2457
}

2458
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2459 2460
  SBlockNumber num = {0};

2461
  int32_t code = moveToNextFile(pReader, &num);
2462 2463 2464 2465 2466
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2467
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2468 2469 2470 2471 2472
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2473 2474
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2475
  } else {  // no block data, only last block exists
2476
    tBlockDataReset(&pReader->status.fileBlockData);
2477
    resetDataBlockIterator(pBlockIter, pReader->order);
2478
  }
2479 2480

  // set the correct start position according to the query time window
2481
  initBlockDumpInfo(pReader, pBlockIter);
2482 2483 2484
  return code;
}

2485
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2486 2487
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2488 2489
}

2490
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2491
  int32_t code = TSDB_CODE_SUCCESS;
2492 2493
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2494 2495
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2496
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2497
  _begin:
2498 2499 2500 2501 2502
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2503 2504 2505 2506
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2507
    // all data blocks are checked in this last block file, now let's try the next file
2508 2509 2510 2511 2512 2513 2514 2515
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2516
      // this file does not have data files, let's start check the last block file if exists
2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2532
  while (1) {
2533 2534
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2535
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2536
      code = buildComposedDataBlock(pReader);
2537 2538 2539 2540 2541 2542 2543
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2544
        } else {
H
Haojun Liao 已提交
2545
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2546 2547 2548 2549 2550 2551
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2552

2553 2554 2555 2556
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2557

2558 2559 2560 2561
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2562
          }
2563
        }
H
Haojun Liao 已提交
2564
      }
2565 2566

      code = doBuildDataBlock(pReader);
2567 2568
    }

2569 2570 2571 2572 2573 2574 2575 2576
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2577
}
H
refact  
Hongze Cheng 已提交
2578

2579 2580
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2581
  if (VND_IS_RSMA(pVnode)) {
2582
    int8_t  level = 0;
2583 2584
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2585
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2599
    const char* str = (idStr != NULL) ? idStr : "";
2600 2601

    if (level == TSDB_RETENTION_L0) {
2602
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2603
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2604 2605
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2606
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2607
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2608 2609
      return VND_RSMA1(pVnode);
    } else {
2610
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2611
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2612 2613 2614 2615 2616 2617 2618
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2619
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2620
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2621 2622

  int64_t endVer = 0;
L
Liu Jicong 已提交
2623 2624
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2625 2626
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2627
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2628 2629
  }

H
Haojun Liao 已提交
2630
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2631 2632
}

2633
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2634 2635 2636 2637
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2638 2639 2640
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2641

2642 2643 2644 2645 2646 2647
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2648
        return false;
2649 2650 2651
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2652 2653
      }
    } else {
2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2684 2685
    }
  } else {
2686 2687
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2688

2689 2690 2691 2692 2693 2694 2695
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2696
    } else {
2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2724 2725 2726 2727 2728
      }

      return false;
    }
  }
2729 2730

  return false;
2731 2732
}

2733
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2734
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2735 2736
    return NULL;
  }
H
Hongze Cheng 已提交
2737

2738
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2739
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2740
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2741
    pIter->hasVal = false;
H
Haojun Liao 已提交
2742 2743
    return NULL;
  }
H
Hongze Cheng 已提交
2744

2745
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2746
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2747
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2748 2749
    return pRow;
  }
H
Hongze Cheng 已提交
2750

2751
  while (1) {
2752 2753
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2754 2755
      return NULL;
    }
H
Hongze Cheng 已提交
2756

2757
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2758

H
Haojun Liao 已提交
2759
    key = TSDBROW_KEY(pRow);
2760
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2761
      pIter->hasVal = false;
H
Haojun Liao 已提交
2762 2763
      return NULL;
    }
H
Hongze Cheng 已提交
2764

dengyihao's avatar
dengyihao 已提交
2765
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2766
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2767 2768 2769 2770
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2771

2772 2773
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2774
  while (1) {
2775 2776
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2777 2778
      break;
    }
H
Hongze Cheng 已提交
2779

2780
    // data exists but not valid
2781
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
2782 2783 2784 2785 2786
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2787
    TSDBKEY k = TSDBROW_KEY(pRow);
2788
    if (k.ts != ts) {
H
Haojun Liao 已提交
2789 2790 2791
      break;
    }

H
Haojun Liao 已提交
2792
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2793
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2794 2795 2796 2797 2798
  }

  return TSDB_CODE_SUCCESS;
}

2799
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2800
                                          SVersionRange* pVerRange, int32_t step) {
2801 2802
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2803
      rowIndex += step;
2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
2820
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
2821 2822
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2823
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2824
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2825

2826
  *state = CHECK_FILEBLOCK_QUIT;
2827
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2828

H
Hongze Cheng 已提交
2829 2830
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2831
  if (pNeighborBlock == NULL) {  // do nothing
2832 2833 2834 2835
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2836 2837
  taosMemoryFree(pNeighborBlock);

2838
  if (overlap) {  // load next block
2839
    SReaderStatus*  pStatus = &pReader->status;
2840 2841
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2842
    // 1. find the next neighbor block in the scan block list
2843
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2844
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2845

2846
    // 2. remove it from the scan block list
2847
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2848

2849
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2850
    tBlockDataReset(&pStatus->fileBlockData);
2851 2852 2853 2854 2855 2856
    int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2857 2858 2859 2860
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2861
    // 4. check the data values
2862 2863 2864 2865
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2866
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2867 2868 2869 2870 2871 2872 2873
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2874 2875
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2876 2877
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2878
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2879
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2880
  int32_t step = asc ? 1 : -1;
2881

2882
  pDumpInfo->rowIndex += step;
2883
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2884 2885 2886
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2887

2888 2889 2890 2891
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2892

2893
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2894
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2895 2896 2897
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2898
      }
2899
    }
H
Haojun Liao 已提交
2900
  }
2901

H
Haojun Liao 已提交
2902 2903 2904
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
2905 2906
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                               SRowMerger* pMerger) {
H
Haojun Liao 已提交
2907
  pScanInfo->lastKey = ts;
2908
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
2909 2910
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
2911
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2912 2913 2914 2915 2916 2917 2918 2919 2920
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2921 2922
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
2923
  TSDBROW* pNextRow = NULL;
2924
  TSDBROW  current = *pRow;
2925

2926 2927
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
2928

2929 2930 2931
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
2932
      return TSDB_CODE_SUCCESS;
2933
    } else {  // has next point in mem/imem
2934
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
2935 2936 2937
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2938
        return TSDB_CODE_SUCCESS;
2939 2940
      }

H
Haojun Liao 已提交
2941
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
2942 2943
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2944
        return TSDB_CODE_SUCCESS;
2945
      }
2946
    }
2947 2948
  }

2949 2950
  SRowMerger merge = {0};

2951
  // get the correct schema for data in memory
H
Haojun Liao 已提交
2952
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
2953

2954 2955
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
2956
  }
H
Haojun Liao 已提交
2957

H
Haojun Liao 已提交
2958 2959 2960 2961 2962 2963
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
2964 2965 2966 2967
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
2968

2969
  tRowMergerClear(&merge);
2970
  *freeTSRow = true;
2971
  return TSDB_CODE_SUCCESS;
2972 2973
}

2974
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
2975
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
2976 2977
  SRowMerger merge = {0};

2978 2979 2980
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2981
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
2982
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2983

H
Haojun Liao 已提交
2984
    tRowMergerInit(&merge, piRow, pSchema);
2985
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2986

2987
    tRowMerge(&merge, pRow);
2988
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2989
  } else {
H
Haojun Liao 已提交
2990
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2991

H
Haojun Liao 已提交
2992
    tRowMergerInit(&merge, pRow, pSchema);
2993
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2994 2995

    tRowMerge(&merge, piRow);
2996
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2997
  }
2998

2999 3000
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3001 3002
}

3003 3004
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3005 3006
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3007
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3008
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3009

3010 3011
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3012
  if (pBlockScanInfo->iter.hasVal) {
3013 3014 3015 3016 3017 3018
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3019
  if (pBlockScanInfo->iiter.hasVal) {
3020 3021 3022 3023 3024 3025
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3026
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3027
    TSDBKEY k = TSDBROW_KEY(pRow);
3028
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3029

3030
    int32_t code = TSDB_CODE_SUCCESS;
3031 3032
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3033
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3034
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3035
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3036
      }
3037
    } else {  // ik.ts == k.ts
3038
      *freeTSRow = true;
3039 3040 3041 3042
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3043
    }
3044

3045
    return code;
H
Haojun Liao 已提交
3046 3047
  }

3048
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3049 3050
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3051 3052
  }

3053
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3054
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3055 3056 3057 3058 3059
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3060
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3061 3062 3063
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3064
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3065
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3066

3067
  SColVal colVal = {0};
3068
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3069

3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3081
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3082 3083 3084 3085 3086 3087 3088 3089
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3090
    }
3091 3092
  }

3093
  // set null value since current column does not exist in the "pSchema"
3094
  while (i < numOfCols) {
3095 3096 3097 3098 3099
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3100 3101 3102 3103
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3104 3105
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3106 3107 3108 3109 3110 3111 3112 3113
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3114
    i += 1;
3115 3116 3117 3118 3119 3120
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

3121
  while (i < numOfOutputCols && j < numOfInputCols) {
3122
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3123
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3124

3125 3126 3127 3128 3129
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3130
    if (pData->cid == pCol->info.colId) {
3131 3132
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3133
      j += 1;
3134
    } else if (pData->cid > pCol->info.colId) {  // the specified column does not exist in file block, fill with null data
3135 3136 3137 3138 3139 3140 3141 3142
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3143
    colDataAppendNULL(pCol, outputRowIndex);
3144 3145 3146 3147 3148 3149 3150
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3151 3152
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3153 3154 3155 3156
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3157
    bool    freeTSRow = false;
3158
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3159 3160
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3161 3162
    }

H
Haojun Liao 已提交
3163
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3164 3165 3166
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3167 3168

    // no data in buffer, return immediately
3169
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3170 3171 3172
      break;
    }

3173
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3174 3175 3176 3177
      break;
    }
  } while (1);

3178
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3179 3180
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3181

3182
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3183
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3184 3185 3186
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

3187
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
3188
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3189 3190 3191
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3192 3193 3194 3195 3196 3197
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3198

dengyihao's avatar
dengyihao 已提交
3199 3200 3201 3202 3203 3204
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3205

H
Hongze Cheng 已提交
3206
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3207

H
refact  
Hongze Cheng 已提交
3208
// ====================================== EXPOSED APIs ======================================
3209 3210
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3211 3212
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3213 3214
    goto _err;
  }
H
Hongze Cheng 已提交
3215

3216
  // check for query time window
H
Haojun Liao 已提交
3217
  STsdbReader* pReader = *ppReader;
3218
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3219 3220 3221
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3222

3223 3224 3225
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3226
    int32_t     order = pCond->order;
3227 3228 3229 3230 3231 3232 3233 3234 3235 3236
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3237
    // here we only need one more row, so the capacity is set to be ONE.
3238 3239 3240 3241 3242 3243 3244 3245
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3246
    } else {
3247 3248 3249 3250 3251 3252 3253 3254 3255
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3256
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3257
  if (pCond->suid != 0) {
H
Haojun Liao 已提交
3258 3259 3260 3261
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
    if (pReader->pSchema == NULL) {
      tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
    }
3262 3263
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
H
Haojun Liao 已提交
3264 3265 3266 3267
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
    if (pReader->pSchema == NULL) {
      tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
    }
3268 3269
  }

3270 3271
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3272 3273 3274
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3275

H
Haojun Liao 已提交
3276 3277 3278
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3279

H
Hongze Cheng 已提交
3280
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3281 3282 3283
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3284

3285 3286
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3287

3288
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3289
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3300
  } else {
3301
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3302 3303
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3304 3305 3306 3307 3308
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3309
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3310
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
3311 3312 3313 3314 3315 3316 3317 3318 3319

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3320 3321 3322
    }
  }

3323
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3324
  return code;
H
Hongze Cheng 已提交
3325 3326

_err:
S
Shengliang Guan 已提交
3327
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3328
  return code;
H
refact  
Hongze Cheng 已提交
3329 3330 3331
}

void tsdbReaderClose(STsdbReader* pReader) {
3332 3333
  if (pReader == NULL) {
    return;
3334
  }
H
refact  
Hongze Cheng 已提交
3335

3336
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3337

3338 3339 3340 3341
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3342
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3343 3344 3345 3346
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3347

3348
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3349
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3350 3351

  cleanupDataBlockIterator(&pReader->status.blockIter);
3352 3353

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3354
  destroyBlockScanInfo(pReader->status.pTableMap);
3355
  blockDataDestroy(pReader->pResBlock);
3356

H
Haojun Liao 已提交
3357 3358 3359
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3360

3361 3362
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);

3363
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3364
  SIOCostSummary* pCost = &pReader->cost;
3365

H
Haojun Liao 已提交
3366 3367
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3368 3369
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3370

H
Haojun Liao 已提交
3371 3372 3373 3374 3375
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3376

3377
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3378 3379
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3380 3381 3382 3383 3384
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3385

3386 3387
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3388 3389 3390
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3391
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3392 3393
}

3394
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3395
  // cleanup the data that belongs to the previous data block
3396 3397
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3398

3399
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3400

3401 3402 3403 3404 3405
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3406

3407 3408 3409
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3410
      buildBlockFromBufferSequentially(pReader);
3411
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3412
    }
3413 3414 3415
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3416
  }
3417

3418
  return false;
H
refact  
Hongze Cheng 已提交
3419 3420
}

3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3458 3459 3460 3461
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3462 3463
}

3464 3465
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3466
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3467
      setBlockInfo(pReader, pDataBlockInfo);
3468
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3469 3470 3471 3472 3473 3474 3475 3476 3477
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3478
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3479
  int32_t code = 0;
3480
  *allHave = false;
H
Hongze Cheng 已提交
3481

3482
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3483 3484 3485 3486
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3487
  // there is no statistics data for composed block
3488 3489 3490 3491
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3492

3493
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3494

H
Hongze Cheng 已提交
3495 3496
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3497

3498 3499
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3500
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3501
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3502
    if (code != TSDB_CODE_SUCCESS) {
3503 3504
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3505 3506
      return code;
    }
3507 3508 3509
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3510
  }
H
Hongze Cheng 已提交
3511

3512
  *allHave = true;
H
Hongze Cheng 已提交
3513

3514 3515
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3516

3517 3518
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3535 3536
      i += 1;
      j += 1;
3537 3538 3539 3540 3541 3542 3543
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3544
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3545
  pReader->cost.smaLoadTime += elapsed;
3546
  pReader->cost.smaDataLoad += 1;
3547 3548 3549

  *pBlockStatis = pSup->plist;

3550
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3551 3552
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3553
  return code;
H
Hongze Cheng 已提交
3554 3555
}

3556
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3557 3558 3559
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3560
    return pReader->pResBlock->pDataBlock;
3561
  }
3562

3563
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3564
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3565

H
Haojun Liao 已提交
3566
  tBlockDataReset(&pStatus->fileBlockData);
3567 3568
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3569 3570
    terrno = code;
    return NULL;
3571 3572 3573
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3574
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3575
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3576 3577
    terrno = code;
    return NULL;
3578
  }
3579 3580 3581

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3582 3583
}

3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3596
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3597 3598 3599
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3600

L
Liu Jicong 已提交
3601
  pReader->order = pCond->order;
3602
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3603
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3604
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3605
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3606

3607
  // allocate buffer in order to load data blocks from file
3608
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3609 3610
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3611
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3612
  tsdbDataFReaderClose(&pReader->pFileReader);
3613

3614
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3615

3616
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3617
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3618

3619
  int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1;
H
Haojun Liao 已提交
3620
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3621

3622
  int32_t         code = 0;
3623 3624
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3625 3626 3627 3628 3629 3630
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3631 3632
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3633 3634 3635
      return code;
    }
  }
H
Hongze Cheng 已提交
3636

dengyihao's avatar
dengyihao 已提交
3637 3638
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3639

3640
  return code;
H
Hongze Cheng 已提交
3641
}
H
Hongze Cheng 已提交
3642

3643 3644 3645
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3646

3647 3648 3649 3650
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3651

3652 3653
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3654

3655 3656 3657
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3658

3659
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3660

3661
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3662

3663 3664
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3665

3666 3667
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3668

3669 3670
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3671
  }
H
Hongze Cheng 已提交
3672

3673
  pTableBlockInfo->numOfTables = numOfTables;
3674
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3675

3676 3677
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
3678
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3679

3680 3681
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3682

3683 3684 3685
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3686

3687 3688 3689
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3690

3691 3692 3693
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3694

3695 3696
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3697 3698

      hasNext = blockIteratorNext(&pStatus->blockIter);
3699 3700 3701 3702 3703
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3704

3705 3706
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3707
    }
H
refact  
Hongze Cheng 已提交
3708

H
Hongze Cheng 已提交
3709 3710
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3711
  }
H
Hongze Cheng 已提交
3712

H
refact  
Hongze Cheng 已提交
3713 3714
  return code;
}
H
Hongze Cheng 已提交
3715

H
refact  
Hongze Cheng 已提交
3716
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3717
  int64_t rows = 0;
H
Hongze Cheng 已提交
3718

3719 3720
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3721

3722 3723 3724 3725 3726
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3727
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3728 3729 3730 3731 3732 3733 3734
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3735
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3736 3737 3738 3739 3740 3741 3742 3743
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3744

H
refact  
Hongze Cheng 已提交
3745
  return rows;
H
Hongze Cheng 已提交
3746
}
D
dapan1121 已提交
3747

L
Liu Jicong 已提交
3748
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3761

D
dapan1121 已提交
3762
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3763
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3779

D
dapan1121 已提交
3780 3781
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3812
  // fs
H
Hongze Cheng 已提交
3813 3814 3815 3816 3817
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3818 3819 3820 3821 3822 3823 3824 3825

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3826
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3841
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3842
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3843
  }
H
Hongze Cheng 已提交
3844

S
Shengliang Guan 已提交
3845
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
3846
}