tsdbRead.c 125.5 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o)  (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Hongze Cheng 已提交
73 74 75
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
76
  SArray*          pColAgg;
77
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
78
  SColumnDataAgg** plist;
79 80
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
81 82
} SBlockLoadSuppInfo;

83 84 85
typedef struct SLastBlockReader {
  STimeWindow   window;
  SVersionRange verRange;
86
  int32_t       order;
87
  uint64_t      uid;
88
  SMergeTree    mergeTree;
89
  SSttBlockLoadInfo* pInfo;
90 91
} SLastBlockReader;

92
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
93 94 95
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
96
  int32_t           order;
H
Hongze Cheng 已提交
97
  SLastBlockReader* pLastBlockReader;  // last file block reader
98
} SFilesetIter;
H
Haojun Liao 已提交
99 100

typedef struct SFileDataBlockInfo {
101
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
102
  uint64_t uid;
103
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
104 105 106
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
107
  int32_t   numOfBlocks;
108
  int32_t   index;
H
Hongze Cheng 已提交
109
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
110
  int32_t   order;
H
Hongze Cheng 已提交
111
  SDataBlk  block;  // current SDataBlk data
112
  SHashObj* pTableMap;
H
Haojun Liao 已提交
113 114 115
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
116 117 118 119
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
120 121
} SFileBlockDumpInfo;

122
typedef struct SUidOrderCheckInfo {
123 124
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
125 126
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
127
typedef struct SReaderStatus {
128
  bool                 loadFromFile;       // check file stage
129
  bool                 composedDataBlock;  // the returned data block is a composed block or not
130 131
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
132
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
133
  SFileBlockDumpInfo   fBlockDumpInfo;
134 135 136 137
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
138 139
} SReaderStatus;

H
Hongze Cheng 已提交
140
struct STsdbReader {
H
Haojun Liao 已提交
141 142 143 144 145 146 147
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
148 149
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
150
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
151
  STsdbReadSnap*     pReadSnap;
152
  SIOCostSummary     cost;
153 154
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
155 156
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
157

158 159
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
160
};
H
Hongze Cheng 已提交
161

H
Haojun Liao 已提交
162
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
163 164
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
165
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
166 167
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
168 169
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                                       SRowMerger* pMerger);
170
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
171
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
172
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
173
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
174
                                         int32_t rowIndex);
175
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
176
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
177

H
Hongze Cheng 已提交
178 179 180 181
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
182 183
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
184

dengyihao's avatar
dengyihao 已提交
185 186 187 188
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
189
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
190 191 192
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
193

194 195 196
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

197
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
198

199
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
200
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
201 202 203
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
204 205
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
206

H
Haojun Liao 已提交
207 208
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
209
    pSupInfo->colIds[i] = pCol->info.colId;
210 211 212 213

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
214
  }
H
Hongze Cheng 已提交
215

H
Haojun Liao 已提交
216 217
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
218

219
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
220
  // allocate buffer in order to load data blocks from file
221
  // todo use simple hash instead, optimize the memory consumption
222 223 224
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
225 226 227
    return NULL;
  }

228
  for (int32_t j = 0; j < numOfTables; ++j) {
229
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
230
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
231 232
      int64_t skey = pTsdbReader->window.skey;
      info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
wmmhello's avatar
wmmhello 已提交
233
    } else {
H
Haojun Liao 已提交
234 235
      int64_t ekey = pTsdbReader->window.ekey;
      info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
H
Haojun Liao 已提交
236
    }
wmmhello's avatar
wmmhello 已提交
237

238 239 240
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
241 242
  }

243 244
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
245

246
  return pTableMap;
H
Hongze Cheng 已提交
247
}
H
Hongze Cheng 已提交
248

H
Haojun Liao 已提交
249
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
250 251
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
252
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
253 254
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
255
    if (p->iter.iter != NULL) {
256
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
257 258
    }

259
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
260
    p->lastKey = ts;
261 262 263
  }
}

264 265 266 267 268 269 270 271
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
272
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
273 274 275
    }

    if (p->iiter.iter != NULL) {
276
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
277 278
    }

279 280
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
281
    tMapDataClear(&p->mapData);
282 283 284 285 286
  }

  taosHashCleanup(pTableMap);
}

287
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
288 289
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
290
}
H
Hongze Cheng 已提交
291

292 293 294
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
295
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
296

297
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
298
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
299

dengyihao's avatar
dengyihao 已提交
300
  STimeWindow win = *pWindow;
301 302 303 304 305 306
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
307

H
Haojun Liao 已提交
308
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
309 310 311 312 313 314
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
315 316 317
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
318 319 320 321
  }
}

// init file iterator
322
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
323
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
324

325 326
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
327
  pIter->pFileList = aDFileSet;
328
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
329

330 331 332 333
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
334
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
335 336
      return code;
    }
337 338
  }

339 340 341 342 343 344 345 346
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

347
  if (pLReader->pInfo == NULL) {
H
Haojun Liao 已提交
348 349 350 351 352
    pLReader->pInfo = tCreateLastBlockLoadInfo();
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
353 354
  }

355
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
356 357 358
  return TSDB_CODE_SUCCESS;
}

359
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
360 361
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
362 363 364
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
365 366 367
    return false;
  }

368 369
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
370
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
371

H
Haojun Liao 已提交
372 373
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
374

375
  while (1) {
H
Haojun Liao 已提交
376 377 378
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
379

380
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
381

382 383 384 385
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
386

387 388
    pReader->cost.headFileLoad += 1;

389 390 391 392 393 394 395 396 397 398 399 400
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
401 402 403
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
404 405
      continue;
    }
C
Cary Xu 已提交
406

407
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
408
              pReader->window.ekey, pReader->idStr);
409 410
    return true;
  }
411

412
_err:
H
Haojun Liao 已提交
413 414 415
  return false;
}

416
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
417 418
  pIter->order = order;
  pIter->index = -1;
419
  pIter->numOfBlocks = 0;
420 421 422 423 424 425 426
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
427
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
428

H
Haojun Liao 已提交
429
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
430 431
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
432 433
}

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

457 458
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
459
  int32_t      code = 0;
460
  int8_t       level = 0;
H
Haojun Liao 已提交
461
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
462 463
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
464
    goto _end;
H
Hongze Cheng 已提交
465 466
  }

C
Cary Xu 已提交
467 468 469 470
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
471
  initReaderStatus(&pReader->status);
472

L
Liu Jicong 已提交
473
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
474 475
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
476
  pReader->capacity = 4096;
dengyihao's avatar
dengyihao 已提交
477 478
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
479
  pReader->type = pCond->type;
480
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
481

482
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
483

484
  limitOutputBufferSize(pCond, &pReader->capacity);
485

486 487
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
488
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
489
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
490
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
491 492 493
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
494

495 496
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
497
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
498 499 500 501 502
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

503 504 505 506
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
507
  }
H
Hongze Cheng 已提交
508

509 510
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
511 512
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
513

H
Haojun Liao 已提交
514 515
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
516 517 518
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
519

H
Haojun Liao 已提交
520
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
521
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
522

523
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
524
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
525
  if (code != TSDB_CODE_SUCCESS) {
526
    goto _end;
H
Haojun Liao 已提交
527
  }
H
Hongze Cheng 已提交
528

529 530
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
531
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
532 533
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
534

535 536 537 538
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
539
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
540

541
    // uid check
H
Hongze Cheng 已提交
542
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
543 544 545 546
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
547
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
548 549 550 551 552 553
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
554
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
555 556
    }

H
Hongze Cheng 已提交
557
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
558
  }
H
Hongze Cheng 已提交
559

560
  int64_t et2 = taosGetTimestampUs();
561
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
562
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
563 564 565

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

566
_end:
H
Hongze Cheng 已提交
567
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
568 569
  return code;
}
H
Hongze Cheng 已提交
570

571
static void cleanupTableScanInfo(SHashObj* pTableMap) {
572
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
573
  while (1) {
574
    px = taosHashIterate(pTableMap, px);
575 576 577 578
    if (px == NULL) {
      break;
    }

579
    // reset the index in last block when handing a new file
580
    tMapDataClear(&px->mapData);
581 582
    taosArrayClear(px->pBlockList);
  }
583 584
}

585
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
586 587 588 589 590 591
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
592

dengyihao's avatar
dengyihao 已提交
593
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
594
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
595

596
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
597

598
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
599
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
600

601
    sizeInDisk += pScanInfo->mapData.nData;
602
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
603 604
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
605

606
      // 1. time range check
607
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
608 609
        continue;
      }
H
Hongze Cheng 已提交
610

611
      // 2. version range check
H
Hongze Cheng 已提交
612
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
613 614
        continue;
      }
615

616
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
617
      if (p == NULL) {
618
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
619 620
        return TSDB_CODE_OUT_OF_MEMORY;
      }
621

622
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
623
    }
H
Hongze Cheng 已提交
624

H
Haojun Liao 已提交
625
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
626 627 628 629
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
630
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
631
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
632

633
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
634
  tsdbDebug(
635
      "load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
636
      "time:%.2f ms %s",
637
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
638
      pReader->idStr);
639

640
  pReader->cost.numOfBlocks += total;
641
  pReader->cost.headFileLoadTime += el;
642

H
Haojun Liao 已提交
643 644
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
645

646
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
647
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
648
  pDumpInfo->allDumped = true;
649
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
650 651
}

652 653
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
654
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
655
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
656 657 658
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
659
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
660 661 662 663
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
664
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
665
  }
H
Haojun Liao 已提交
666 667
}

668
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
669 670
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
671 672
    return NULL;
  }
673 674 675

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
676 677
}

H
Hongze Cheng 已提交
678
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
679

680
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
681
  SReaderStatus*  pStatus = &pReader->status;
682
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
683

684
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
685
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
686
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
687
  SSDataBlock*        pResBlock = pReader->pResBlock;
688
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
689

H
Haojun Liao 已提交
690
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
691
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
692

H
Haojun Liao 已提交
693
  SColVal cv = {0};
694
  int64_t st = taosGetTimestampUs();
695 696
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
697

698
  int32_t rowIndex = 0;
699 700
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

701 702 703 704 705 706 707 708
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

709
  int32_t          i = 0;
710 711
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
712
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
713 714 715 716 717
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

718 719 720
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
721 722 723
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
724
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
725 726 727
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
728
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
729 730
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
731
      }
732
      colIndex += 1;
733
      i += 1;
734
      ASSERT(rowIndex == remain);
735 736
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
737
      i += 1;
H
Haojun Liao 已提交
738
    }
739 740
  }

741
  while (i < numOfOutputCols) {
742 743 744
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
745
  }
H
Haojun Liao 已提交
746

747
  pResBlock->info.rows = remain;
748
  pDumpInfo->rowIndex += step * remain;
749

750
  setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
H
Haojun Liao 已提交
751

752
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
753
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
754

755
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
756
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
757
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
758
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
759
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
760 761 762 763

  return TSDB_CODE_SUCCESS;
}

764
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
765 766
  int64_t st = taosGetTimestampUs();

767
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
768
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
769
  ASSERT(pBlockInfo != NULL);
770

H
Hongze Cheng 已提交
771 772
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
  int32_t   code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
773 774 775
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
776
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
777 778 779
              tstrerror(code), pReader->idStr);
    return code;
  }
780

781
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
782

783 784 785 786
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
787 788 789

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
790

H
Haojun Liao 已提交
791
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
792
}
H
Hongze Cheng 已提交
793

H
Haojun Liao 已提交
794 795 796
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
797

H
Haojun Liao 已提交
798 799 800 801
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
802

H
Haojun Liao 已提交
803 804
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
805

H
Haojun Liao 已提交
806 807
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
808

H
Haojun Liao 已提交
809
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
810 811
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
812

H
Haojun Liao 已提交
813 814 815 816
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
817

H
Haojun Liao 已提交
818 819
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
820

H
Haojun Liao 已提交
821
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
822
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
823
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
824

H
Haojun Liao 已提交
825
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
826

H
Haojun Liao 已提交
827 828
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
829

H
Haojun Liao 已提交
830 831 832 833 834 835 836
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
837

838
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
839
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
840

841 842 843 844
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
845 846 847
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
848
    int32_t*             mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
849
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
850
  }
851 852 853 854 855 856

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
857
}
H
Hongze Cheng 已提交
858

859
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
860
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
861

862
  pBlockIter->numOfBlocks = numOfBlocks;
863
  taosArrayClear(pBlockIter->blockList);
864
  pBlockIter->pTableMap = pReader->status.pTableMap;
865

866 867
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
868

869
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
870

871
  SBlockOrderSupporter sup = {0};
872
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
873 874 875
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
876

877 878 879 880 881 882 883
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
884

885 886 887 888
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
889

890 891
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
892

893 894 895 896 897
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
898

899
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
900
    SDataBlk block = {0};
901 902
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
903 904

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
905
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
906

907
      wrapper.uid = pTableScanInfo->uid;
908
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
909

910 911 912 913 914 915
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
916

917
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
918

919
  // since there is only one table qualified, blocks are not sorted
920 921
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
922 923
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
924
    }
925

926
    int64_t et = taosGetTimestampUs();
927
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
928
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
929

930
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
931
    cleanupBlockOrderSupporter(&sup);
932
    doSetCurrentBlock(pBlockIter);
933
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
934
  }
H
Haojun Liao 已提交
935

936 937
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
938

939
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
940

941 942 943 944 945
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
946
  }
H
Haojun Liao 已提交
947

948 949 950 951
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
952

953 954
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
955

956 957 958 959
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
960

961 962
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
963
  }
H
Haojun Liao 已提交
964

965
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
966 967
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
968 969
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
970

971
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
972 973
  doSetCurrentBlock(pBlockIter);

974
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
975
}
H
Hongze Cheng 已提交
976

H
Haojun Liao 已提交
977
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
978 979
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

980
  int32_t step = asc ? 1 : -1;
981
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
982 983 984
    return false;
  }

985
  pBlockIter->index += step;
986 987
  doSetCurrentBlock(pBlockIter);

988 989 990
  return true;
}

991 992 993
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
994
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
995 996
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
997 998
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
999
}
H
Hongze Cheng 已提交
1000

H
Hongze Cheng 已提交
1001 1002
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                             int32_t* nextIndex, int32_t order) {
1003 1004 1005
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1006 1007
  }

1008
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1009 1010 1011
    return NULL;
  }

1012
  int32_t step = asc ? 1 : -1;
1013
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1014

H
Hongze Cheng 已提交
1015 1016
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1017

H
Hongze Cheng 已提交
1018
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1019
  return pBlock;
1020 1021 1022 1023 1024
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1025
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1026 1027
  int32_t index = pBlockIter->index;

1028
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1041
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1042
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1043 1044 1045 1046
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1047 1048 1049 1050 1051
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1052

1053 1054 1055
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1056

1057
  doSetCurrentBlock(pBlockIter);
1058 1059 1060
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1061
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1062 1063 1064 1065 1066 1067
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1068
}
H
Hongze Cheng 已提交
1069

H
Hongze Cheng 已提交
1070
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1071
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1072

1073
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1074
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1075
}
H
Hongze Cheng 已提交
1076

H
Hongze Cheng 已提交
1077
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1078 1079
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1080 1081
}

H
Hongze Cheng 已提交
1082
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1083 1084 1085 1086 1087
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1088
      if (p->version >= pBlock->minVer) {
1089 1090 1091
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1092
      if (p->version >= pBlock->minVer) {
1093 1094 1095 1096 1097 1098 1099
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1100
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1116
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1117 1118 1119 1120
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1121
  // ts is not overlap
1122
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1123
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1124 1125 1126 1127 1128
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1129 1130 1131 1132
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1133
    while (1) {
1134 1135 1136 1137 1138
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1139 1140 1141
      }
    }

1142 1143
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1144 1145
}

1146 1147 1148 1149
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1150
// 5. delete info should not overlap with current block data
H
Hongze Cheng 已提交
1151
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock,
1152
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
H
Hongze Cheng 已提交
1153 1154
  int32_t   neighborIndex = 0;
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
1155

1156
  // overlap with neighbor
1157 1158 1159
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1160
    taosMemoryFree(pNeighbor);
1161 1162
  }

1163
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1164 1165
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1166

1167
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
1168
  // todo
1169
  bool overlapWithlastBlock = false;
1170
#if 0
H
Hongze Cheng 已提交
1171
  if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
H
Hongze Cheng 已提交
1172
    SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
H
Hongze Cheng 已提交
1173
    overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
1174
  }
1175
#endif
1176

1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
  bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
  bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);

  bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
                        moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1187
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1188 1189 1190 1191 1192 1193
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
              pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
              moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1194 1195
}

1196
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1197
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1198 1199
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1200

1201 1202 1203
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1204
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1205

1206
  blockDataUpdateTsWindow(pBlock, 0);
1207
  pBlock->info.uid = pBlockScanInfo->uid;
1208

1209
  setComposedBlockFlag(pReader, true);
1210

1211
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1212
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1213 1214 1215
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1216 1217

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1218 1219 1220
  return code;
}

1221 1222
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1223 1224 1225 1226 1227
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1228
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1229 1230

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1231
    if (nextKey != key) {  // merge is not needed
1232
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1233 1234 1235 1236 1237 1238 1239 1240
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
1241 1242 1243 1244 1245 1246
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1247
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1266
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1267 1268 1269 1270 1271 1272
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1273
  int64_t tsLast = INT64_MIN;
1274
  if (hasDataInLastBlock(pLastBlockReader)) {
1275 1276
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1277

H
Hongze Cheng 已提交
1278 1279
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1280

1281 1282
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1283
    minKey = INT64_MAX;  // chosen the minimum value
1284
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1285 1286
      minKey = tsLast;
    }
1287

1288 1289 1290
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1291

1292 1293 1294 1295 1296
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1297
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1308 1309 1310 1311
  }

  bool init = false;

1312
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1313
  // DESC: mem -----> imem -----> last block -----> file block
1314 1315
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1316
      init = true;
1317 1318
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1319 1320
    }

1321
    if (minKey == tsLast) {
1322
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1323 1324 1325
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1326 1327 1328
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1329
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1330
    }
1331

1332
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1333 1334 1335
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1336 1337 1338 1339 1340 1341 1342 1343 1344
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1345 1346
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1347
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1348 1349
    }

1350
    if (minKey == tsLast) {
1351
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1352 1353 1354
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1355 1356 1357
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1358
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1359 1360 1361
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1362 1363 1364
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1365 1366 1367 1368 1369
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1370 1371
  }

1372 1373 1374 1375 1376
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1377 1378 1379 1380 1381 1382 1383
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1384 1385 1386
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1387
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1388
  int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1389 1390 1391 1392

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};

1393
  TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
1394 1395 1396 1397
  tRowMergerInit(&merge, &fRow, pReader->pSchema);
  doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);

  // merge with block data if ts == key
1398
  if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
1399 1400 1401
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
  }

1402 1403 1404 1405 1406
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1407 1408 1409 1410 1411 1412 1413
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1414 1415
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1416 1417
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1418 1419
  if (pBlockData->nRow > 0) {
    // no last block available, only data block exists
1420
    if (!hasDataInLastBlock(pLastBlockReader)) {
1421 1422 1423 1424 1425 1426 1427 1428 1429
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1430
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1431 1432 1433 1434
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1435

1436 1437
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1438 1439 1440 1441

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1442
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
1443

1444 1445 1446 1447 1448
        int32_t code = tRowMergerGetRow(&merge, &pTSRow);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1449
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1450

1451 1452
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1453
        return code;
1454
      } else {
1455 1456
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1457
      }
1458
    } else {  // desc order
1459
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1460
    }
1461
  } else {  // only last block exists
1462
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1463
  }
1464 1465
}

1466 1467
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
1468 1469 1470 1471 1472 1473
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1474 1475
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1476 1477
  ASSERT(pRow != NULL && piRow != NULL);

1478
  int64_t tsLast = INT64_MIN;
1479 1480 1481
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1482 1483 1484 1485 1486 1487

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1488
  int64_t minKey = 0;
1489 1490 1491 1492 1493
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1494

1495 1496 1497
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1498

1499 1500 1501
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
1502

1503
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1504 1505 1506
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1507
    minKey = INT64_MIN;  // let find the maximum ts value
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }

1520
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1521 1522
      minKey = tsLast;
    }
1523 1524 1525 1526
  }

  bool init = false;

1527 1528 1529 1530
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1531
      init = true;
1532 1533 1534
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1535 1536
    }

1537
    if (minKey == tsLast) {
1538
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1539 1540 1541
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1542 1543 1544
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1545
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1546 1547 1548
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1549 1550 1551
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1552 1553 1554 1555 1556
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1557 1558
    }

1559
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1560 1561 1562
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1563 1564 1565 1566 1567 1568 1569 1570
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1571 1572
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1573 1574 1575 1576
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1577 1578 1579
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1580 1581 1582 1583 1584 1585 1586 1587
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == tsLast) {
1588
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1589 1590 1591
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1592 1593 1594
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1595
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1596 1597 1598
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1599
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1600 1601
      if (!init) {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1602 1603
      } else {
        tRowMerge(&merge, &fRow);
1604 1605
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1606 1607 1608
    }
  }

1609 1610 1611 1612 1613
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1614 1615 1616 1617
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
1618
  return code;
1619 1620
}

1621
#if 0
1622
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1623 1624 1625
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1626
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1627
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1628

1629 1630
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1631
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1632

1633
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1634
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1635

1636
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1637

1638 1639 1640
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1641 1642
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1643 1644 1645
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1646
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1647

1648 1649
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1650
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1651 1652
      }

1653 1654
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1655
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1656 1657 1658
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1659
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1660
      return TSDB_CODE_SUCCESS;
1661
    } else {  // key > ik.ts || key > k.ts
1662 1663
      ASSERT(key != ik.ts);

1664
      // [3] ik.ts < key <= k.ts
1665
      // [4] ik.ts < k.ts <= key
1666
      if (ik.ts < k.ts) {
1667
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1668
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1669 1670 1671
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1672 1673 1674
        return TSDB_CODE_SUCCESS;
      }

1675 1676
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1677
      if (k.ts < ik.ts) {
1678
        doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1679
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1680 1681 1682
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1683 1684 1685
        return TSDB_CODE_SUCCESS;
      }

1686
      // [7] k.ts == ik.ts < key
1687
      if (k.ts == ik.ts) {
1688 1689
        ASSERT(key > ik.ts && key > k.ts);

1690
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1691
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1692
        taosMemoryFree(pTSRow);
1693 1694 1695
        return TSDB_CODE_SUCCESS;
      }
    }
1696 1697 1698
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1699
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1700

H
Haojun Liao 已提交
1701
      tRowMergerInit(&merge, pRow, pSchema);
1702
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1703 1704 1705

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1706
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1707 1708 1709 1710 1711 1712 1713 1714 1715
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1716
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1717 1718
      return TSDB_CODE_SUCCESS;
    } else {
1719
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1720 1721 1722 1723

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1724
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1725
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1726 1727 1728
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1740
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1741
        taosMemoryFree(pTSRow);
1742 1743 1744 1745 1746
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1747
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1748 1749 1750 1751 1752

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1753
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1754 1755

        taosMemoryFree(pTSRow);
1756 1757 1758 1759 1760 1761
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1762
  return -1;
1763
}
1764
#endif
1765

1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1791
                  "-%" PRId64 " %s",
1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1812
                  "-%" PRId64 " %s",
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1830 1831
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1832 1833 1834 1835 1836 1837 1838 1839
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1851
  TSDBKEY k = {.ts = ts, .version = ver};
1852
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1853 1854 1855
    return false;
  }

1856 1857 1858
  return true;
}

1859
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1860

1861
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
H
Hongze Cheng 已提交
1862
  while (1) {
1863 1864 1865 1866
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }
1867

1868 1869 1870 1871 1872
    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
      return true;
    }
1873
  }
1874 1875
}

1876
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
1877
  // the last block reader has been initialized for this table.
1878
  if (pLBlockReader->uid == pScanInfo->uid) {
1879 1880 1881
    return true;
  }

1882 1883
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
1884 1885
  }

1886 1887
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
1888

1889 1890 1891 1892
  int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
1893
  } else {
1894
    w.ekey = pScanInfo->lastKey + step;
1895 1896
  }

1897
  int32_t code =
1898
      tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
H
Haojun Liao 已提交
1899
                     pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo);
1900 1901 1902 1903
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

1904
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
1905 1906
}

1907
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
1908 1909 1910
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
  TSDBKEY key = TSDBROW_KEY(&row);
  return key.ts;
1911 1912
}

H
Hongze Cheng 已提交
1913
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
1914

1915 1916
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1929 1930 1931 1932 1933
    int32_t code = tRowMergerGetRow(&merge, &pTSRow);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1934 1935 1936 1937 1938 1939
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
1940

1941 1942 1943
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1944 1945
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1946 1947
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

H
Hongze Cheng 已提交
1948
  int64_t  key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1949 1950
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1951

1952
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1953
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1954
  } else {
1955
    // imem + file + last block
1956
    if (pBlockScanInfo->iiter.hasVal) {
1957
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
1958 1959
    }

1960
    // mem + file + last block
1961
    if (pBlockScanInfo->iter.hasVal) {
1962
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
1963
    }
1964

1965 1966
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
1967 1968 1969
  }
}

1970
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
1971 1972
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
1973
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
1974 1975 1976 1977 1978 1979 1980 1981

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
1982
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
1983
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1984 1985
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1986

1987 1988
  int64_t st = taosGetTimestampUs();

1989
  while (1) {
1990
    // todo check the validate of row in file block
1991
    bool hasBlockData = false;
1992
    {
H
Haojun Liao 已提交
1993
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
1994 1995 1996 1997 1998
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

1999 2000
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2001
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2002
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2003
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2004 2005 2006
          break;
        }
      }
2007
    }
2008

2009
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2010

2011 2012 2013
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2014 2015
    }

2016
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2017

2018
    // currently loaded file data block is consumed
2019
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2020
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2021
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2022 2023 2024 2025 2026
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2027 2028 2029 2030
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2031 2032
  blockDataUpdateTsWindow(pResBlock, 0);

2033
  setComposedBlockFlag(pReader, true);
2034
  int64_t et = taosGetTimestampUs();
2035

2036 2037 2038 2039 2040 2041
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
              pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
              pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
  }
2042

2043 2044 2045 2046 2047
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2048 2049
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2050 2051 2052
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2053

2054 2055 2056
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2057 2058
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2059
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2060 2061
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2062
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2063
    if (code != TSDB_CODE_SUCCESS) {
2064 2065 2066 2067 2068
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2069
      tsdbDelFReaderClose(&pDelFReader);
2070 2071 2072
      goto _err;
    }

H
Hongze Cheng 已提交
2073
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2074 2075 2076
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2077 2078
      goto _err;
    }
2079

2080 2081 2082
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2083
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2084
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2085 2086 2087 2088 2089 2090 2091
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2092
    }
2093
  }
2094

2095 2096 2097 2098 2099 2100 2101
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2102 2103
  }

2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2118 2119
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2120 2121
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2122
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2123 2124
  return code;

2125 2126 2127
_err:
  taosArrayDestroy(pDelData);
  return code;
2128 2129
}

2130
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2131
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2132
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2133
  if (pRow != NULL) {
2134 2135 2136
    key = TSDBROW_KEY(pRow);
  }

2137
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2138
  if (pRow != NULL) {
2139 2140 2141 2142 2143 2144 2145 2146 2147
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2148
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2149
  SReaderStatus* pStatus = &pReader->status;
2150
  pBlockNum->numOfBlocks = 0;
2151
  pBlockNum->numOfLastFiles = 0;
2152

2153
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2154
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2155 2156

  while (1) {
2157
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2158
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2159 2160 2161
      break;
    }

H
Haojun Liao 已提交
2162
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2163 2164
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2165
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2166 2167 2168
      return code;
    }

H
Hongze Cheng 已提交
2169
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2170
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2171
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2172
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2173 2174 2175
        return code;
      }

2176
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2177 2178 2179
        break;
      }
    }
2180

H
Haojun Liao 已提交
2181 2182 2183
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2184
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2185 2186 2187
  return TSDB_CODE_SUCCESS;
}

2188
static int32_t uidComparFunc(const void* p1, const void* p2) {
2189 2190
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2191 2192 2193
  if (pu1 == pu2) {
    return 0;
  } else {
2194
    return (pu1 < pu2) ? -1 : 1;
2195 2196
  }
}
2197

2198
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2199 2200 2201 2202
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2203
  while (p != NULL) {
2204 2205 2206 2207 2208 2209 2210 2211
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2212
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2213 2214 2215 2216
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2217

2218
  if (pOrderCheckInfo->tableUidList == NULL) {
2219 2220 2221 2222 2223 2224
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2225
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2226 2227 2228
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2229 2230
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2231 2232
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2233 2234

      // the tableMap has already updated
2235
      if (pStatus->pTableIter == NULL) {
2236
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2237 2238 2239 2240 2241 2242 2243 2244 2245
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2246
      }
2247
    }
2248
  }
2249

2250 2251 2252
  return TSDB_CODE_SUCCESS;
}

2253
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2266
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2267
  SReaderStatus*    pStatus = &pReader->status;
2268 2269
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2270 2271
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2272
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2273 2274
    return code;
  }
2275

2276
  while (1) {
2277 2278
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2279
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2280
    if (!hasVal) {
2281 2282
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2283 2284 2285
        return TSDB_CODE_SUCCESS;
      }
      continue;
2286 2287
    }

2288 2289 2290 2291 2292 2293 2294 2295
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2296

2297
    // current table is exhausted, let's try next table
2298 2299
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2300 2301
      return TSDB_CODE_SUCCESS;
    }
2302 2303 2304
  }
}

2305
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2306 2307
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2308 2309 2310

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2311 2312 2313
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2314

2315
  if (pBlockInfo != NULL) {
2316 2317 2318 2319 2320 2321
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

  if (pBlockInfo != NULL) {
2322
    pBlock = getCurrentBlock(pBlockIter);
2323 2324
  }

2325
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2326
  TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
2327

2328 2329 2330 2331 2332 2333 2334
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2335
      return code;
2336
    }
2337

2338 2339 2340
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2341 2342 2343
    }

    // build composed data block
2344
    code = buildComposedDataBlock(pReader);
2345 2346
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2347
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2348
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2349
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2350 2351 2352 2353
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2354
      ASSERT(tsLast >= pBlock->maxKey.ts);
2355 2356 2357
      tBlockDataReset(&pReader->status.fileBlockData);

      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2358
    } else {  // whole block is required, return it directly
2359 2360 2361 2362 2363 2364 2365
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2366 2367 2368 2369 2370
  }

  return code;
}

H
Haojun Liao 已提交
2371
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2372 2373
  SReaderStatus* pStatus = &pReader->status;

2374
  while (1) {
2375 2376 2377
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2378
        return TSDB_CODE_SUCCESS;
2379 2380 2381 2382
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2383
    initMemDataIterator(pBlockScanInfo, pReader);
2384

2385
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2386
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2387 2388 2389 2390
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2391
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2392
      return TSDB_CODE_SUCCESS;
2393 2394 2395 2396 2397
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2398
      return TSDB_CODE_SUCCESS;
2399 2400 2401 2402
    }
  }
}

2403
// set the correct start position in case of the first/last file block, according to the query time window
2404
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2405
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2406

2407 2408 2409
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2410 2411 2412

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2413
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2414 2415
}

2416
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2417 2418
  SBlockNumber num = {0};

2419
  int32_t code = moveToNextFile(pReader, &num);
2420 2421 2422 2423 2424
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2425
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2426 2427 2428 2429 2430
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2431 2432
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2433
  } else {  // no block data, only last block exists
2434
    tBlockDataReset(&pReader->status.fileBlockData);
2435
    resetDataBlockIterator(pBlockIter, pReader->order);
2436
  }
2437 2438

  // set the correct start position according to the query time window
2439
  initBlockDumpInfo(pReader, pBlockIter);
2440 2441 2442
  return code;
}

2443
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2444 2445
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2446 2447
}

2448
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2449
  int32_t code = TSDB_CODE_SUCCESS;
2450 2451
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2452 2453
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2454
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2455
  _begin:
2456 2457 2458 2459 2460
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2461 2462 2463 2464
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2465
    // all data blocks are checked in this last block file, now let's try the next file
2466 2467 2468 2469 2470 2471 2472 2473
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2474
      // this file does not have data files, let's start check the last block file if exists
2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2490
  while (1) {
2491 2492
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2493
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2494
      code = buildComposedDataBlock(pReader);
2495 2496 2497 2498 2499 2500 2501
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2502
        } else {
H
Haojun Liao 已提交
2503
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2504 2505 2506 2507 2508 2509
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2510

2511 2512 2513 2514
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2515

2516 2517 2518 2519
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2520
          }
2521
        }
H
Haojun Liao 已提交
2522
      }
2523 2524

      code = doBuildDataBlock(pReader);
2525 2526
    }

2527 2528 2529 2530 2531 2532 2533 2534
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2535
}
H
refact  
Hongze Cheng 已提交
2536

2537 2538
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2539
  if (VND_IS_RSMA(pVnode)) {
2540
    int8_t  level = 0;
2541 2542
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2543
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2557
    const char* str = (idStr != NULL) ? idStr : "";
2558 2559

    if (level == TSDB_RETENTION_L0) {
2560
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2561
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2562 2563
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2564
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2565
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2566 2567
      return VND_RSMA1(pVnode);
    } else {
2568
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2569
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2570 2571 2572 2573 2574 2575 2576
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2577
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2578
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2579 2580

  int64_t endVer = 0;
L
Liu Jicong 已提交
2581 2582
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2583 2584
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2585
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2586 2587
  }

H
Haojun Liao 已提交
2588
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2589 2590
}

2591
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2592 2593 2594 2595
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2596 2597 2598
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2599

2600 2601 2602 2603 2604 2605
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2606
        return false;
2607 2608 2609
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2610 2611
      }
    } else {
2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2642 2643
    }
  } else {
2644 2645
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2646

2647 2648 2649 2650 2651 2652 2653
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2654
    } else {
2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2682 2683 2684 2685 2686
      }

      return false;
    }
  }
2687 2688

  return false;
2689 2690
}

2691
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2692
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2693 2694
    return NULL;
  }
H
Hongze Cheng 已提交
2695

2696
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2697
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2698
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2699
    pIter->hasVal = false;
H
Haojun Liao 已提交
2700 2701
    return NULL;
  }
H
Hongze Cheng 已提交
2702

2703
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2704
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2705
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2706 2707
    return pRow;
  }
H
Hongze Cheng 已提交
2708

2709
  while (1) {
2710 2711
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2712 2713
      return NULL;
    }
H
Hongze Cheng 已提交
2714

2715
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2716

H
Haojun Liao 已提交
2717
    key = TSDBROW_KEY(pRow);
2718
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2719
      pIter->hasVal = false;
H
Haojun Liao 已提交
2720 2721
      return NULL;
    }
H
Hongze Cheng 已提交
2722

dengyihao's avatar
dengyihao 已提交
2723
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2724
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2725 2726 2727 2728
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2729

2730 2731
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2732
  while (1) {
2733 2734
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2735 2736
      break;
    }
H
Hongze Cheng 已提交
2737

2738
    // data exists but not valid
2739
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
2740 2741 2742 2743 2744
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2745
    TSDBKEY k = TSDBROW_KEY(pRow);
2746
    if (k.ts != ts) {
H
Haojun Liao 已提交
2747 2748 2749
      break;
    }

H
Haojun Liao 已提交
2750
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2751
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2752 2753 2754 2755 2756
  }

  return TSDB_CODE_SUCCESS;
}

2757
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2758
                                          SVersionRange* pVerRange, int32_t step) {
2759 2760
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2761
      rowIndex += step;
2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
2778
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
2779 2780
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2781
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2782
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2783

2784
  *state = CHECK_FILEBLOCK_QUIT;
2785
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2786

H
Hongze Cheng 已提交
2787 2788
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2789
  if (pNeighborBlock == NULL) {  // do nothing
2790 2791 2792 2793
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2794 2795
  taosMemoryFree(pNeighborBlock);

2796
  if (overlap) {  // load next block
2797
    SReaderStatus*  pStatus = &pReader->status;
2798 2799
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2800
    // 1. find the next neighbor block in the scan block list
2801
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2802
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2803

2804
    // 2. remove it from the scan block list
2805
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2806

2807
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2808
    tBlockDataReset(&pStatus->fileBlockData);
2809 2810 2811 2812 2813 2814
    int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2815 2816 2817 2818
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2819
    // 4. check the data values
2820 2821 2822 2823
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2824
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2825 2826 2827 2828 2829 2830 2831
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2832 2833
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2834 2835
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2836
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2837
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2838
  int32_t step = asc ? 1 : -1;
2839

2840
  pDumpInfo->rowIndex += step;
2841
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2842 2843 2844
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2845

2846 2847 2848 2849
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2850

2851
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2852
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2853 2854 2855
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2856
      }
2857
    }
H
Haojun Liao 已提交
2858
  }
2859

H
Haojun Liao 已提交
2860 2861 2862
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
2863 2864
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                               SRowMerger* pMerger) {
H
Haojun Liao 已提交
2865
  pScanInfo->lastKey = ts;
2866
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
2867 2868
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
2869
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2870 2871 2872 2873 2874 2875 2876 2877 2878
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2879 2880
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
2881
  TSDBROW* pNextRow = NULL;
2882
  TSDBROW  current = *pRow;
2883

2884 2885
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
2886

2887 2888 2889
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
2890
      return TSDB_CODE_SUCCESS;
2891
    } else {  // has next point in mem/imem
2892
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
2893 2894 2895
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2896
        return TSDB_CODE_SUCCESS;
2897 2898
      }

H
Haojun Liao 已提交
2899
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
2900 2901
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2902
        return TSDB_CODE_SUCCESS;
2903
      }
2904
    }
2905 2906
  }

2907 2908
  SRowMerger merge = {0};

2909
  // get the correct schema for data in memory
H
Haojun Liao 已提交
2910
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
2911

2912 2913
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
2914
  }
H
Haojun Liao 已提交
2915

H
Haojun Liao 已提交
2916 2917 2918 2919 2920 2921
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
2922 2923 2924 2925
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
2926

2927
  tRowMergerClear(&merge);
2928
  *freeTSRow = true;
2929
  return TSDB_CODE_SUCCESS;
2930 2931
}

2932
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
2933
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
2934 2935
  SRowMerger merge = {0};

2936 2937 2938
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2939
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
2940
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2941

H
Haojun Liao 已提交
2942
    tRowMergerInit(&merge, piRow, pSchema);
2943
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2944

2945
    tRowMerge(&merge, pRow);
2946
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2947
  } else {
H
Haojun Liao 已提交
2948
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2949

H
Haojun Liao 已提交
2950
    tRowMergerInit(&merge, pRow, pSchema);
2951
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2952 2953

    tRowMerge(&merge, piRow);
2954
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2955
  }
2956

2957 2958
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
2959 2960
}

2961 2962
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
2963 2964
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2965
  SArray*  pDelList = pBlockScanInfo->delSkyline;
2966
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
2967

2968 2969
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2970
  if (pBlockScanInfo->iter.hasVal) {
2971 2972 2973 2974 2975 2976
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2977
  if (pBlockScanInfo->iiter.hasVal) {
2978 2979 2980 2981 2982 2983
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2984
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2985
    TSDBKEY k = TSDBROW_KEY(pRow);
2986
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2987

2988
    int32_t code = TSDB_CODE_SUCCESS;
2989 2990
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
2991
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
2992
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
2993
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
2994
      }
2995
    } else {  // ik.ts == k.ts
2996
      *freeTSRow = true;
2997 2998 2999 3000
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3001
    }
3002

3003
    return code;
H
Haojun Liao 已提交
3004 3005
  }

3006
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3007 3008
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3009 3010
  }

3011
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3012
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3013 3014 3015 3016 3017
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3018
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3019 3020 3021
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3022
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3023
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3024

3025
  SColVal colVal = {0};
3026
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3027

3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3039
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3040 3041 3042 3043 3044 3045 3046 3047
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3048
    }
3049 3050
  }

3051
  // set null value since current column does not exist in the "pSchema"
3052
  while (i < numOfCols) {
3053 3054 3055 3056 3057
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3058 3059 3060 3061
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3062 3063
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3064 3065 3066 3067 3068 3069 3070 3071
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3072
    i += 1;
3073 3074 3075 3076 3077 3078
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

3079
  while (i < numOfOutputCols && j < numOfInputCols) {
3080
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3081
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3082 3083

    if (pData->cid == pCol->info.colId) {
3084 3085
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3086 3087 3088 3089 3090 3091 3092 3093 3094 3095
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3096
    colDataAppendNULL(pCol, outputRowIndex);
3097 3098 3099 3100 3101 3102 3103
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3104 3105
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3106 3107 3108 3109
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3110
    bool    freeTSRow = false;
3111
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3112 3113
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3114 3115
    }

H
Haojun Liao 已提交
3116
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3117 3118 3119
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3120 3121

    // no data in buffer, return immediately
3122
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3123 3124 3125
      break;
    }

3126
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3127 3128 3129 3130
      break;
    }
  } while (1);

3131
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3132 3133
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3134

3135
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3136
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3137 3138 3139
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

3140
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
3141
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3142 3143 3144
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3145 3146 3147 3148 3149 3150
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3151

dengyihao's avatar
dengyihao 已提交
3152 3153 3154 3155 3156 3157
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3158

H
Hongze Cheng 已提交
3159
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3160

H
refact  
Hongze Cheng 已提交
3161
// ====================================== EXPOSED APIs ======================================
3162 3163
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3164 3165
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3166 3167
    goto _err;
  }
H
Hongze Cheng 已提交
3168

3169
  // check for query time window
H
Haojun Liao 已提交
3170
  STsdbReader* pReader = *ppReader;
3171
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3172 3173 3174
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3175

3176 3177 3178
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3179
    int32_t     order = pCond->order;
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3190
    // here we only need one more row, so the capacity is set to be ONE.
3191 3192 3193 3194 3195 3196 3197 3198
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3199
    } else {
3200 3201 3202 3203 3204 3205 3206 3207 3208 3209
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
3210
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, pCond->endVersion);
3211 3212
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
3213
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, pCond->endVersion);
3214 3215
  }

3216 3217
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3218 3219 3220
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3221

H
Haojun Liao 已提交
3222 3223 3224
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3225

H
Hongze Cheng 已提交
3226
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3227 3228 3229
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3230

3231 3232
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3233

3234
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3235
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3236 3237 3238 3239 3240 3241 3242 3243 3244 3245

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3246
  } else {
3247
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3248 3249
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3250 3251 3252 3253 3254
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3255
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3256
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
3257 3258 3259 3260 3261 3262 3263 3264 3265

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3266 3267 3268
    }
  }

3269
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3270
  return code;
H
Hongze Cheng 已提交
3271 3272

_err:
S
Shengliang Guan 已提交
3273
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3274
  return code;
H
refact  
Hongze Cheng 已提交
3275 3276 3277
}

void tsdbReaderClose(STsdbReader* pReader) {
3278 3279
  if (pReader == NULL) {
    return;
3280
  }
H
refact  
Hongze Cheng 已提交
3281

3282
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3283

3284 3285 3286 3287
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3288
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3289 3290 3291 3292
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3293

3294
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3295
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3296 3297

  cleanupDataBlockIterator(&pReader->status.blockIter);
3298 3299

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3300
  destroyBlockScanInfo(pReader->status.pTableMap);
3301
  blockDataDestroy(pReader->pResBlock);
3302

H
Haojun Liao 已提交
3303 3304 3305
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3306

3307 3308
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);

3309 3310
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);

H
Haojun Liao 已提交
3311 3312
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
3313
    tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
3314
    pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo);
H
Haojun Liao 已提交
3315 3316 3317
    taosMemoryFree(pFilesetIter->pLastBlockReader);
  }

3318
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
3319

3320
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3321 3322
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3323 3324 3325 3326 3327
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3328

3329 3330
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3331 3332 3333
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3334
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3335 3336
}

3337
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3338
  // cleanup the data that belongs to the previous data block
3339 3340
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3341

3342
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3343

3344 3345 3346 3347 3348
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3349

3350 3351 3352
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3353
      buildBlockFromBufferSequentially(pReader);
3354
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3355
    }
3356 3357 3358
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3359
  }
3360

3361
  return false;
H
refact  
Hongze Cheng 已提交
3362 3363
}

3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3401 3402 3403 3404
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3405 3406
}

3407 3408
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3409
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3410
      setBlockInfo(pReader, pDataBlockInfo);
3411
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3412 3413 3414 3415 3416 3417 3418 3419 3420
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3421
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3422
  int32_t code = 0;
3423
  *allHave = false;
H
Hongze Cheng 已提交
3424

3425
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3426 3427 3428 3429
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3430
  // there is no statistics data for composed block
3431 3432 3433 3434
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3435

3436
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3437

H
Hongze Cheng 已提交
3438 3439
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3440

3441 3442
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3443
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3444
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3445
    if (code != TSDB_CODE_SUCCESS) {
3446 3447
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3448 3449
      return code;
    }
3450 3451 3452
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3453
  }
H
Hongze Cheng 已提交
3454

3455
  *allHave = true;
H
Hongze Cheng 已提交
3456

3457 3458
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3459

3460 3461
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3478 3479
      i += 1;
      j += 1;
3480 3481 3482 3483 3484 3485 3486
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3487
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3488
  pReader->cost.smaLoadTime += elapsed;
3489
  pReader->cost.smaDataLoad += 1;
3490 3491 3492

  *pBlockStatis = pSup->plist;

3493
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3494 3495
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3496
  return code;
H
Hongze Cheng 已提交
3497 3498
}

3499
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3500 3501 3502
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3503
    return pReader->pResBlock->pDataBlock;
3504
  }
3505

3506
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3507
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3508

H
Haojun Liao 已提交
3509
  tBlockDataReset(&pStatus->fileBlockData);
3510 3511
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3512 3513
    terrno = code;
    return NULL;
3514 3515 3516
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3517
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3518
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3519 3520
    terrno = code;
    return NULL;
3521
  }
3522 3523 3524

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3525 3526
}

3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3539
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3540 3541 3542
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3543

L
Liu Jicong 已提交
3544
  pReader->order = pCond->order;
3545
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3546
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3547
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3548
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3549

3550
  // allocate buffer in order to load data blocks from file
3551
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3552 3553
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3554
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3555
  tsdbDataFReaderClose(&pReader->pFileReader);
3556

3557
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3558

3559
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3560
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3561

3562
  int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1;
H
Haojun Liao 已提交
3563
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3564

3565
  int32_t         code = 0;
3566 3567
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3568 3569 3570 3571 3572 3573
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3574 3575
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3576 3577 3578
      return code;
    }
  }
H
Hongze Cheng 已提交
3579

dengyihao's avatar
dengyihao 已提交
3580 3581
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3582

3583
  return code;
H
Hongze Cheng 已提交
3584
}
H
Hongze Cheng 已提交
3585

3586 3587 3588
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3589

3590 3591 3592 3593
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3594

3595 3596
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3597

3598 3599 3600
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3601

3602
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3603

3604
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3605

3606 3607
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3608

3609 3610
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3611

3612 3613
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3614
  }
H
Hongze Cheng 已提交
3615

3616
  pTableBlockInfo->numOfTables = numOfTables;
3617
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3618

3619 3620
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
3621
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3622

3623 3624
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3625

3626 3627 3628
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3629

3630 3631 3632
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3633

3634 3635 3636
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3637

3638 3639
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3640 3641

      hasNext = blockIteratorNext(&pStatus->blockIter);
3642 3643 3644 3645 3646
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3647

3648 3649
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3650
    }
H
refact  
Hongze Cheng 已提交
3651

H
Hongze Cheng 已提交
3652 3653
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3654
  }
H
Hongze Cheng 已提交
3655

H
refact  
Hongze Cheng 已提交
3656 3657
  return code;
}
H
Hongze Cheng 已提交
3658

H
refact  
Hongze Cheng 已提交
3659
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3660
  int64_t rows = 0;
H
Hongze Cheng 已提交
3661

3662 3663
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3664

3665 3666 3667 3668 3669
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3670
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3671 3672 3673 3674 3675 3676 3677
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3678
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3679 3680 3681 3682 3683 3684 3685 3686
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3687

H
refact  
Hongze Cheng 已提交
3688
  return rows;
H
Hongze Cheng 已提交
3689
}
D
dapan1121 已提交
3690

L
Liu Jicong 已提交
3691
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3704

D
dapan1121 已提交
3705
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3706
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3722

D
dapan1121 已提交
3723 3724
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3755
  // fs
H
Hongze Cheng 已提交
3756 3757 3758 3759 3760
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3761 3762 3763 3764 3765 3766 3767 3768

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3769
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3784
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3785
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3786
  }
H
Hongze Cheng 已提交
3787

S
Shengliang Guan 已提交
3788
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
3789
}