tsdbRead.c 164.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
X
Xiaoyu Wang 已提交
82 83 84
  //  double  getTbFromMemTime;
  //  double  getTbFromIMemTime;
  double initDelSkylineIterTime;
H
Hongze Cheng 已提交
85 86 87
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
88 89 90 91 92 93 94
  SArray*        pColAgg;
  SColumnDataAgg tsColAgg;
  int16_t*       colId;
  int16_t*       slotId;
  int32_t        numOfCols;
  char**         buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
  bool           smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
95 96
} SBlockLoadSuppInfo;

97
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
98 99 100 101 102
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
103
  SSttBlockLoadInfo* pInfo;
104 105
} SLastBlockReader;

106
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
107 108 109
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
110
  int32_t           order;
H
Hongze Cheng 已提交
111
  SLastBlockReader* pLastBlockReader;  // last file block reader
112
} SFilesetIter;
H
Haojun Liao 已提交
113 114

typedef struct SFileDataBlockInfo {
115
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
116
  uint64_t uid;
117
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
118 119 120
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
121
  int32_t   numOfBlocks;
122
  int32_t   index;
H
Hongze Cheng 已提交
123
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
124
  int32_t   order;
H
Hongze Cheng 已提交
125
  SDataBlk  block;  // current SDataBlk data
126
  SHashObj* pTableMap;
H
Haojun Liao 已提交
127 128 129
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
130 131 132 133
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
134 135
} SFileBlockDumpInfo;

136
typedef struct STableUidList {
137 138
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
139
} STableUidList;
140

H
Haojun Liao 已提交
141
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
142 143 144
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
145
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
146
  STableUidList         uidList;            // check tables in uid order, to avoid the repeatly load of blocks in STT.
H
Hongze Cheng 已提交
147 148 149 150 151
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
152 153
} SReaderStatus;

154
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
155 156 157
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
D
dapan1121 已提交
158
  int32_t numOfTables;
159 160
} SBlockInfoBuf;

H
Hongze Cheng 已提交
161
struct STsdbReader {
H
Haojun Liao 已提交
162
  STsdb*             pTsdb;
163 164 165
  SVersionRange      verRange;
  TdThreadMutex      readerMutex;
  bool               suspended;
H
Haojun Liao 已提交
166 167
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
168
  bool               freeBlock;
H
Haojun Liao 已提交
169 170 171 172
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
173 174
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
175
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
176
  STsdbReadSnap*     pReadSnap;
177
  SIOCostSummary     cost;
178 179 180 181 182
  STSchema*          pSchema;      // the newest version schema
  STSchema*          pMemSchema;   // the previous schema for in-memory data, to avoid load schema too many times
  SDataFReader*      pFileReader;  // the file reader
  SDelFReader*       pDelFReader;  // the del file reader
  SArray*            pDelIdx;      // del file block index;
183 184 185
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
186
};
H
Hongze Cheng 已提交
187

H
Haojun Liao 已提交
188
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
189 190
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
191
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
192 193
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
194
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
195
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
196
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
197
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
198
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
H
Hongze Cheng 已提交
199
                                     STableBlockScanInfo* pInfo);
200
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
201
                                         int32_t rowIndex);
202
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
203 204
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
205

H
Hongze Cheng 已提交
206
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
207
                                        TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
H
Hongze Cheng 已提交
208
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
H
Hongze Cheng 已提交
209
                                  STsdbReader* pReader, SRow** pTSRow);
210 211
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
212

dengyihao's avatar
dengyihao 已提交
213 214 215 216
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
217
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
218 219 220
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
C
Cary Xu 已提交
221
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
222
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
223
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
224
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
C
Cary Xu 已提交
225

H
Haojun Liao 已提交
226 227
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);

228 229
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);

C
Cary Xu 已提交
230
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
231

232 233
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
                                   int32_t numOfCols) {
234
  pSupInfo->smaValid = true;
235
  pSupInfo->numOfCols = numOfCols;
236
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
H
Haojun Liao 已提交
237 238
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
239 240
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
241

H
Haojun Liao 已提交
242
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
243
  pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
244
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
245 246
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
247

H
Haojun Liao 已提交
248 249
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
250 251
    } else {
      pSupInfo->buildBuf[i] = NULL;
252
    }
H
Haojun Liao 已提交
253
  }
H
Hongze Cheng 已提交
254

H
Haojun Liao 已提交
255 256
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
257

H
Haojun Liao 已提交
258
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
259 260
  int32_t i = 0, j = 0;

H
Hongze Cheng 已提交
261
  while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
262
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
263
    if (pTCol->colId == pSupInfo->colId[j]) {
264 265
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
266
        return TSDB_CODE_SUCCESS;
267 268 269 270
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
271
    } else if (pTCol->colId < pSupInfo->colId[j]) {
272 273 274
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
275
      return TSDB_CODE_INVALID_PARA;
276 277
    }
  }
H
Haojun Liao 已提交
278 279

  return TSDB_CODE_SUCCESS;
280 281
}

282
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
283
  int32_t num = numOfTables / pBuf->numPerBucket;
284 285 286 287 288
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
289
  for (int32_t i = 0; i < num; ++i) {
290 291 292 293
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
294

295 296 297 298 299 300 301
    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
302
    }
303
    taosArrayPush(pBuf->pData, &p);
H
Haojun Liao 已提交
304
  }
H
Hongze Cheng 已提交
305

D
dapan1121 已提交
306 307 308 309 310 311 312 313 314 315 316
  pBuf->numOfTables = numOfTables;

  return TSDB_CODE_SUCCESS;
}

static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  if (numOfTables <= pBuf->numOfTables) {
    return TSDB_CODE_SUCCESS;
  }

  if (pBuf->numOfTables > 0) {
D
dapan1121 已提交
317 318
    STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
    taosMemoryFree(*p);
D
dapan1121 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
    pBuf->numOfTables /= pBuf->numPerBucket;
  }
  
  int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
  int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for (int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  pBuf->numOfTables = numOfTables;

H
Haojun Liao 已提交
347 348
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
349

350 351
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
352
  for (int32_t i = 0; i < num; ++i) {
353 354 355 356 357 358 359 360 361
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
362
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
363 364 365
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

H
Haojun Liao 已提交
366 367 368 369 370 371 372 373 374 375
static int32_t uidComparFunc(const void* p1, const void* p2) {
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
  if (pu1 == pu2) {
    return 0;
  } else {
    return (pu1 < pu2) ? -1 : 1;
  }
}

376
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Hongze Cheng 已提交
377
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
X
Xiaoyu Wang 已提交
378
                                         STableUidList* pUidList, int32_t numOfTables) {
H
Haojun Liao 已提交
379
  // allocate buffer in order to load data blocks from file
380
  // todo use simple hash instead, optimize the memory consumption
381 382 383
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
384 385 386
    return NULL;
  }

H
Haojun Liao 已提交
387
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
388
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
389

H
Haojun Liao 已提交
390 391
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
  if (pUidList->tableUidList == NULL) {
H
Haojun Liao 已提交
392 393
    return NULL;
  }
H
Haojun Liao 已提交
394
  pUidList->currentIndex = 0;
H
Haojun Liao 已提交
395

396
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
397
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
H
Haojun Liao 已提交
398

399
    pScanInfo->uid = idList[j].uid;
H
Haojun Liao 已提交
400
    pUidList->tableUidList[j] = idList[j].uid;
H
Haojun Liao 已提交
401

402
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
403
      int64_t skey = pTsdbReader->window.skey;
404
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
405
    } else {
H
Haojun Liao 已提交
406
      int64_t ekey = pTsdbReader->window.ekey;
407
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
408
    }
wmmhello's avatar
wmmhello 已提交
409

410
    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
411 412
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
413 414
  }

H
Haojun Liao 已提交
415
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
H
Haojun Liao 已提交
416

H
Haojun Liao 已提交
417 418 419 420
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
421

422
  return pTableMap;
H
Hongze Cheng 已提交
423
}
H
Hongze Cheng 已提交
424

425 426
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
427
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
428
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
429 430

    pInfo->iterInit = false;
H
Haojun Liao 已提交
431
    pInfo->iter.hasVal = false;
432
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
433

434 435
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
436 437
    }

H
Haojun Liao 已提交
438 439
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
440 441
    }

442 443
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
444 445 446
  }
}

447 448
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
449 450

  p->iter.hasVal = false;
451
  p->iiter.hasVal = false;
452

453 454 455
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
456

457 458 459
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
460

461 462 463 464
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
465

H
Haojun Liao 已提交
466
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
467
  void* p = NULL;
468
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
469
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
470 471 472 473 474
  }

  taosHashCleanup(pTableMap);
}

475
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
H
Hongze Cheng 已提交
476

477 478 479
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
480
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
481

482
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
483
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
484

dengyihao's avatar
dengyihao 已提交
485
  STimeWindow win = *pWindow;
486 487 488 489 490 491
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
492

H
Haojun Liao 已提交
493
// init file iterator
494
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
495
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
496

497 498
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
499
  pIter->pFileList = aDFileSet;
500
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
501

502 503 504 505
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
506
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
507 508
      return code;
    }
509 510
  }

511 512 513 514 515 516 517 518
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

519
  if (pLReader->pInfo == NULL) {
520
    // here we ignore the first column, which is always be the primary timestamp column
521 522 523
    SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;

    int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
X
Xiaoyu Wang 已提交
524
    pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
H
Haojun Liao 已提交
525 526 527 528
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
529 530
  }

531
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
532 533 534
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
535
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) {
536 537
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
538
  pIter->index += step;
D
dapan1121 已提交
539
  int32_t code = 0;
540 541

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
542 543
    *hasNext = false;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
544 545
  }

H
Haojun Liao 已提交
546 547 548
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

549 550
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
551
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
552

H
Haojun Liao 已提交
553 554
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
555

556
  while (1) {
H
Haojun Liao 已提交
557 558 559
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
560

561
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
562

D
dapan1121 已提交
563
    code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
564 565 566
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
567

568 569
    pReader->cost.headFileLoad += 1;

570 571 572 573 574 575 576
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
577 578
      *hasNext = false;
      return TSDB_CODE_SUCCESS;
579 580 581 582
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
583
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
584 585
        *hasNext = false;
        return TSDB_CODE_SUCCESS;
586
      }
587 588
      continue;
    }
C
Cary Xu 已提交
589

590
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
591
              pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
592 593
    *hasNext = true;
    return TSDB_CODE_SUCCESS;
594
  }
595

596
_err:
D
dapan1121 已提交
597 598
  *hasNext = false;
  return code;
H
Haojun Liao 已提交
599 600
}

601
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
602 603
  pIter->order = order;
  pIter->index = -1;
604
  pIter->numOfBlocks = 0;
605 606 607 608 609 610 611
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
612
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
613

H
Haojun Liao 已提交
614
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
615 616
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
617 618
}

619 620 621 622 623 624 625 626
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
627
    SColumnInfoData colInfo = {0};
628 629 630 631 632 633 634 635 636 637 638 639 640
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexInit(&pReader->readerMutex, NULL);

  qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexDestroy(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexTryLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbReleaseReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexUnlock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

696 697 698 699 700 701
void tsdbReleaseDataBlock(STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;
  if (!pStatus->composedDataBlock) {
    tsdbReleaseReader(pReader);
  }
}
702

703
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
704
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
705
  int32_t      code = 0;
706
  int8_t       level = 0;
H
Haojun Liao 已提交
707
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
708 709
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
710
    goto _end;
H
Hongze Cheng 已提交
711 712
  }

C
Cary Xu 已提交
713
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
714
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
715 716
  }

H
Haojun Liao 已提交
717
  initReaderStatus(&pReader->status);
718

L
Liu Jicong 已提交
719
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
720 721
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
722
  pReader->capacity = capacity;
H
Haojun Liao 已提交
723
  pReader->pResBlock = pResBlock;
724
  pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
dengyihao's avatar
dengyihao 已提交
725
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
726
  pReader->type = pCond->type;
727
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
728
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
729

H
Haojun Liao 已提交
730 731 732 733 734 735 736 737
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }
738

H
Haojun Liao 已提交
739 740 741 742 743
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }
H
Hongze Cheng 已提交
744

745 746
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
747
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
748
  if (pSup->pColAgg == NULL) {
749 750 751
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
752

753 754
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
755
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
756 757 758 759 760
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

761
  setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
762

763
  tsdbInitReaderLock(pReader);
764

H
Hongze Cheng 已提交
765 766
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
767

H
Haojun Liao 已提交
768 769
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
770 771 772
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
773

H
Haojun Liao 已提交
774
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
X
Xiaoyu Wang 已提交
775
  int64_t    st = taosGetTimestampUs();
776 777 778
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
779
    goto _end;
H
Haojun Liao 已提交
780
  }
H
Hongze Cheng 已提交
781

H
Haojun Liao 已提交
782 783
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

784 785
  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
786
  if (num == 0) {
787
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
788 789
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
790

H
Haojun Liao 已提交
791
  // todo binary search to the start position
792 793
  int64_t et1 = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
794
  SBlockIdx*     pBlockIdx = NULL;
795
  STableUidList* pList = &pReader->status.uidList;
H
Haojun Liao 已提交
796

H
Haojun Liao 已提交
797
  int32_t i = 0, j = 0;
X
Xiaoyu Wang 已提交
798
  while (i < num && j < numOfTables) {
H
Haojun Liao 已提交
799
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Hongze Cheng 已提交
800
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
801
      i += 1;
H
Haojun Liao 已提交
802 803 804
      continue;
    }

H
Haojun Liao 已提交
805 806
    if (pBlockIdx->uid < pList->tableUidList[j]) {
      i += 1;
H
Haojun Liao 已提交
807 808 809
      continue;
    }

H
Haojun Liao 已提交
810
    if (pBlockIdx->uid > pList->tableUidList[j]) {
H
Haojun Liao 已提交
811
      j += 1;
H
Haojun Liao 已提交
812
      continue;
H
Haojun Liao 已提交
813 814
    }

H
Haojun Liao 已提交
815
    if (pBlockIdx->uid == pList->tableUidList[j]) {
H
Haojun Liao 已提交
816
      // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
817 818 819
      STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
      if (pScanInfo == NULL) {
        return terrno;
H
Haojun Liao 已提交
820 821 822 823 824 825 826
      }

      if (pScanInfo->pBlockList == NULL) {
        pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
      }

      taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
827

H
Haojun Liao 已提交
828
      i += 1;
H
Haojun Liao 已提交
829
      j += 1;
830
    }
H
Haojun Liao 已提交
831
  }
H
Hongze Cheng 已提交
832

833
  int64_t et2 = taosGetTimestampUs();
H
Haojun Liao 已提交
834 835 836
  tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0,
            pReader->idStr);
837 838 839

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

840
_end:
841
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
842 843
  return code;
}
H
Hongze Cheng 已提交
844

845
static void cleanupTableScanInfo(SHashObj* pTableMap) {
846
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
847
  while (1) {
848
    px = taosHashIterate(pTableMap, px);
849 850 851 852
    if (px == NULL) {
      break;
    }

853
    // reset the index in last block when handing a new file
854 855
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
856
  }
857 858
}

859
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
860 861 862 863 864 865
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
866

dengyihao's avatar
dengyihao 已提交
867
  for (int32_t i = 0; i < numOfTables; ++i) {
X
Xiaoyu Wang 已提交
868
    SBlockIdx*           pBlockIdx = taosArrayGet(pIndexList, i);
H
Haojun Liao 已提交
869 870 871 872
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
    if (pScanInfo == NULL) {
      return terrno;
    }
H
Hongze Cheng 已提交
873

874
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
875
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
876
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
877

878
    sizeInDisk += pScanInfo->mapData.nData;
879 880 881 882 883 884 885 886 887 888 889 890 891

    int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
    STimeWindow w = pReader->window;
    if (ASCENDING_TRAVERSE(pReader->order)) {
      w.skey = pScanInfo->lastKey + step;
    } else {
      w.ekey = pScanInfo->lastKey + step;
    }

    if (isEmptyQueryTimeWindow(&w)) {
      continue;
    }

H
Haojun Liao 已提交
892
    SDataBlk block = {0};
893
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
894
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
895

896
      // 1. time range check
897 898
      // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
      if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
H
Haojun Liao 已提交
899 900
        continue;
      }
H
Hongze Cheng 已提交
901

902
      // 2. version range check
H
Hongze Cheng 已提交
903
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
904 905
        continue;
      }
906

907
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
908
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
909

H
Haojun Liao 已提交
910 911
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
912
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
913 914
        return TSDB_CODE_OUT_OF_MEMORY;
      }
915

916
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
917
    }
H
Hongze Cheng 已提交
918

H
Haojun Liao 已提交
919
    if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
920 921 922 923
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
924
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
925
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
926

927
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
928
  tsdbDebug(
929
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
930
      "time:%.2f ms %s",
931
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
932
      pReader->idStr);
933

934
  pReader->cost.numOfBlocks += total;
935
  pReader->cost.headFileLoadTime += el;
936

H
Haojun Liao 已提交
937 938
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
939

940
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
941
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
942
  pDumpInfo->allDumped = true;
943
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
944 945
}

946 947
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
948
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
949
    if (!COL_VAL_IS_VALUE(pColVal)) {
950
      colDataSetNULL(pColInfoData, rowIndex);
H
Haojun Liao 已提交
951 952
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
953
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
954 955 956 957
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

958
      colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
H
Haojun Liao 已提交
959 960
    }
  } else {
961
    colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
962
  }
H
Haojun Liao 已提交
963 964
}

965
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
966 967 968
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
969 970
    return NULL;
  }
971 972 973

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
974 975
}

H
Hongze Cheng 已提交
976
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
977

C
Cary Xu 已提交
978 979 980 981 982 983
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Haojun Liao 已提交
984
  ASSERT(pos >= 0 && pos < num && num > 0);
C
Cary Xu 已提交
985 986
  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
987 988
    e = num - 1;
    if (key < keyList[pos]) return -1;
C
Cary Xu 已提交
989 990
    while (1) {
      // check can return
H
Hongze Cheng 已提交
991 992 993
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
C
Cary Xu 已提交
994 995

      // change start or end position
H
Hongze Cheng 已提交
996
      int mid = s + (e - s + 1) / 2;
C
Cary Xu 已提交
997 998
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
999
      else if (keyList[mid] < key)
C
Cary Xu 已提交
1000 1001 1002 1003
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
1004
  } else {  // DESC
C
Cary Xu 已提交
1005
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
1006 1007
    e = 0;
    if (key > keyList[pos]) return -1;
C
Cary Xu 已提交
1008 1009
    while (1) {
      // check can return
H
Hongze Cheng 已提交
1010 1011 1012
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
C
Cary Xu 已提交
1013 1014

      // change start or end position
H
Hongze Cheng 已提交
1015
      int mid = s - (s - e + 1) / 2;
C
Cary Xu 已提交
1016 1017
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
1018
      else if (keyList[mid] > key)
C
Cary Xu 已提交
1019 1020 1021 1022 1023 1024 1025
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
1026
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
C
Cary Xu 已提交
1027 1028
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
1029
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
1030 1031 1032 1033 1034 1035

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
1036 1037
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
C
Cary Xu 已提交
1038 1039 1040 1041 1042
  }

  return endPos;
}

H
Haojun Liao 已提交
1043
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
1063 1064
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Hongze Cheng 已提交
1065
                            int32_t dumpedRows, bool asc) {
H
Haojun Liao 已提交
1066 1067 1068 1069 1070 1071 1072 1073
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

H
Hongze Cheng 已提交
1074
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1075

H
Haojun Liao 已提交
1076
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
1077
  //  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
1078 1079 1080 1081 1082 1083

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
H
Hongze Cheng 已提交
1084
    switch (pColData->info.type) {
H
Haojun Liao 已提交
1085 1086 1087
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
1088
      case TSDB_DATA_TYPE_UBIGINT: {
H
Haojun Liao 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
H
Hongze Cheng 已提交
1102
        int32_t mid = dumpedRows >> 1u;
H
Haojun Liao 已提交
1103 1104
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1105
          int8_t t = pts[j];
H
Haojun Liao 已提交
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1130
          int32_t t = pts[j];
H
Haojun Liao 已提交
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1153
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
1154 1155 1156 1157
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1158

1159
  SBlockData*         pBlockData = &pStatus->fileBlockData;
C
Cary Xu 已提交
1160
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1161
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1162
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1163
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1164

H
Haojun Liao 已提交
1165
  SColVal cv = {0};
1166
  int64_t st = taosGetTimestampUs();
1167 1168
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1169

1170 1171
  // no data exists, return directly.
  if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
X
Xiaoyu Wang 已提交
1172 1173
    tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
             pBlockInfo->uid, pReader->idStr);
1174 1175 1176 1177
    pResBlock->info.rows = 0;
    return 0;
  }

1178 1179
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1180 1181 1182
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1183
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1184
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1185 1186 1187
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1188 1189 1190 1191 1192 1193 1194 1195 1196

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1197
    }
C
Cary Xu 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
  }

  // time window check
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1208 1209 1210
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1211 1212
  }

H
Haojun Liao 已提交
1213
  int32_t i = 0;
C
Cary Xu 已提交
1214 1215
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1216 1217
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1218
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1219 1220 1221
    i += 1;
  }

1222
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1223
  int32_t num = pBlockData->nColData;
1224
  while (i < numOfOutputCols && colIndex < num) {
1225 1226
    rowIndex = 0;

H
Hongze Cheng 已提交
1227
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1228
    if (pData->cid < pSupInfo->colId[i]) {
1229
      colIndex += 1;
H
Haojun Liao 已提交
1230 1231
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1232

H
Hongze Cheng 已提交
1233
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
1234
        colDataSetNNULL(pColData, 0, dumpedRows);
C
Cary Xu 已提交
1235
      } else {
H
Haojun Liao 已提交
1236
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1237 1238
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1239
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
C
Cary Xu 已提交
1240 1241 1242 1243
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1244
      }
C
Cary Xu 已提交
1245

1246
      colIndex += 1;
1247
      i += 1;
1248
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1249
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1250
      colDataSetNNULL(pColData, 0, dumpedRows);
1251
      i += 1;
H
Haojun Liao 已提交
1252
    }
1253 1254
  }

1255
  // fill the mis-matched columns with null value
1256
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1257
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1258
    colDataSetNNULL(pColData, 0, dumpedRows);
1259
    i += 1;
H
Haojun Liao 已提交
1260
  }
H
Haojun Liao 已提交
1261

1262
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1263 1264
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1265

1266
  // check if current block are all handled
C
Cary Xu 已提交
1267 1268
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1269 1270 1271
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
C
Cary Xu 已提交
1272
  } else {
1273 1274
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
C
Cary Xu 已提交
1275
  }
H
Haojun Liao 已提交
1276

1277
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1278
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1279

1280
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1281
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1282
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1283
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1284
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1285 1286 1287 1288

  return TSDB_CODE_SUCCESS;
}

1289 1290
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1291
  int32_t code = 0;
1292 1293
  int64_t st = taosGetTimestampUs();

1294
  tBlockDataReset(pBlockData);
1295 1296
  STSchema* pSchema = getLatestTableSchema(pReader, uid);
  if (pSchema == NULL) {
X
Xiaoyu Wang 已提交
1297
    tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
1298 1299 1300 1301
    return code;
  }

  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
X
Xiaoyu Wang 已提交
1302
  TABLEID             tid = {.suid = pReader->suid, .uid = uid};
1303
  code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
1304 1305 1306 1307
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1308
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1309
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1310

H
Hongze Cheng 已提交
1311
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1312
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1313 1314 1315
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
1316
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1317 1318 1319
              tstrerror(code), pReader->idStr);
    return code;
  }
1320

1321
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1322

1323 1324 1325 1326
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1327 1328 1329

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1330

H
Haojun Liao 已提交
1331
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1332
}
H
Hongze Cheng 已提交
1333

H
Haojun Liao 已提交
1334 1335 1336
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1337

H
Haojun Liao 已提交
1338 1339 1340 1341
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1342

H
Haojun Liao 已提交
1343 1344
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1345

H
Haojun Liao 已提交
1346 1347
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1348 1349
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1350

H
Haojun Liao 已提交
1351 1352 1353 1354
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1355

H
Haojun Liao 已提交
1356 1357
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1358

H
Haojun Liao 已提交
1359
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1360
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1361
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1362

H
Haojun Liao 已提交
1363
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1364

H
Haojun Liao 已提交
1365 1366
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1367

H
Haojun Liao 已提交
1368 1369 1370 1371 1372 1373 1374
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1375

1376
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1377
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1378

1379 1380 1381
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1382
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1383 1384
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1385
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1386
    if (pScanInfo == NULL) {
H
Haojun Liao 已提交
1387
      return terrno;
H
Haojun Liao 已提交
1388 1389
    }

H
Haojun Liao 已提交
1390 1391
    SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1392
  }
1393 1394 1395 1396 1397 1398

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1399
}
H
Hongze Cheng 已提交
1400

1401
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1402
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1403

1404
  SBlockOrderSupporter sup = {0};
1405
  pBlockIter->numOfBlocks = numOfBlocks;
1406
  taosArrayClear(pBlockIter->blockList);
1407
  pBlockIter->pTableMap = pReader->status.pTableMap;
1408

1409 1410
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1411

1412
  int64_t st = taosGetTimestampUs();
1413
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1414 1415 1416
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1417

1418 1419 1420 1421 1422 1423 1424
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1425

1426
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1427 1428 1429
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1430

1431 1432
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1433

1434 1435 1436
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1437
      return TSDB_CODE_OUT_OF_MEMORY;
1438
    }
H
Haojun Liao 已提交
1439

1440
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1441

1442 1443 1444
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1445
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1446 1447 1448 1449 1450
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1451

H
Haojun Liao 已提交
1452 1453 1454 1455
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1456

1457
  // since there is only one table qualified, blocks are not sorted
1458 1459
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1460 1461
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1462
    }
1463

1464
    int64_t et = taosGetTimestampUs();
1465
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1466
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1467

1468
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1469
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1470
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1471
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1472
  }
H
Haojun Liao 已提交
1473

1474 1475
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1476

1477
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1478 1479

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1480 1481
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1482
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1483
  }
H
Haojun Liao 已提交
1484

1485 1486 1487 1488
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1489

1490 1491
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1492

1493 1494 1495 1496
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1497

1498 1499
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1500
  }
H
Haojun Liao 已提交
1501

1502
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1503 1504
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1505 1506
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1507

1508
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1509
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1510

1511
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1512
}
H
Hongze Cheng 已提交
1513

H
Haojun Liao 已提交
1514
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1515 1516
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1517
  int32_t step = asc ? 1 : -1;
1518
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1519 1520 1521
    return false;
  }

1522
  pBlockIter->index += step;
H
Haojun Liao 已提交
1523
  doSetCurrentBlock(pBlockIter, idStr);
1524

1525 1526 1527
  return true;
}

1528 1529 1530
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1531
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1532 1533
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1534 1535
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1536
}
H
Hongze Cheng 已提交
1537

1538
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1539
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1540
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1541
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1542
    return false;
1543 1544
  }

H
Haojun Liao 已提交
1545
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1546
    return false;
1547 1548
  }

1549
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1550
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1551 1552
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1553
  return true;
1554 1555 1556
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1557
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1558 1559
  int32_t index = pBlockIter->index;

1560
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1572
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1573
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1574 1575 1576 1577
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1578 1579 1580 1581 1582
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1583

1584 1585 1586
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1587

H
Haojun Liao 已提交
1588
  doSetCurrentBlock(pBlockIter, "");
1589 1590 1591
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1592
// todo: this attribute could be acquired during extractin the global ordered block list.
1593
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1594 1595
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1596
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1597
  } else {
1598
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1599
  }
H
Haojun Liao 已提交
1600
}
H
Hongze Cheng 已提交
1601

H
Hongze Cheng 已提交
1602
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1603
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1604

1605
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1606
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1607
}
H
Hongze Cheng 已提交
1608

H
Hongze Cheng 已提交
1609
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1610 1611
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1612 1613
}

H
Hongze Cheng 已提交
1614 1615
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1616 1617
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1618
  for (int32_t i = startIndex; i < num; i += 1) {
1619 1620
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1621
      if (p->version >= pBlock->minVer) {
1622 1623 1624
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1625
      if (p->version >= pBlock->minVer) {
1626 1627
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1628 1629
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1643
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1644 1645 1646 1647
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1648
  // ts is not overlap
1649
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1650
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1651 1652 1653 1654 1655
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1656
  if (ASCENDING_TRAVERSE(order)) {
1657
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1658 1659
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1660
    while (1) {
1661 1662 1663 1664 1665
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1666 1667 1668
      }
    }

1669
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1670
  }
1671 1672
}

C
Cary Xu 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1686 1687
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1688

1689
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1690

1691
  // overlap with neighbor
1692
  if (hasNeighbor) {
1693
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1694 1695
  }

1696
  // has duplicated ts of different version in this block
C
Cary Xu 已提交
1697 1698
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1699

1700 1701 1702
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1703 1704
  }

C
Cary Xu 已提交
1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}

// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
1720

C
Cary Xu 已提交
1721 1722 1723
  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1724 1725 1726 1727

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
X
Xiaoyu Wang 已提交
1728
              " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
1729
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
C
Cary Xu 已提交
1730 1731 1732
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1733 1734 1735
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1736 1737
}

C
Cary Xu 已提交
1738 1739 1740 1741 1742 1743 1744 1745 1746
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1747
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1748
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1749 1750
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1751

1752 1753 1754
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1755
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1756

H
Haojun Liao 已提交
1757
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1758
  pBlock->info.id.uid = pBlockScanInfo->uid;
1759

1760
  setComposedBlockFlag(pReader, true);
1761

1762
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1763
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
X
Xiaoyu Wang 已提交
1764
            " - %" PRId64 ", uid:%" PRIu64 ",  %s",
1765
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
1766
            pBlockScanInfo->uid, pReader->idStr);
1767 1768

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1769 1770 1771
  return code;
}

1772 1773
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1774 1775 1776
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1777 1778
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1779
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1780 1781

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1782
    if (nextKey != key) {  // merge is not needed
1783
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1784 1785 1786 1787 1788 1789 1790 1791
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1792
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
1793
                                  SVersionRange* pVerRange) {
X
Xiaoyu Wang 已提交
1794
  int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
H
Haojun Liao 已提交
1795

1796 1797 1798 1799 1800 1801 1802 1803
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1804 1805 1806
    if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
      pScanInfo->lastKey = k.ts;
    } else {
H
Haojun Liao 已提交
1807 1808 1809 1810 1811 1812
      // the qualifed ts may equal to k.ts, only a greater version one.
      // here we need to fallback one step.
      if (pScanInfo->lastKey == k.ts) {
        pScanInfo->lastKey -= step;
      }

1813 1814 1815 1816 1817 1818 1819
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1820
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

1835 1836 1837 1838 1839 1840
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
  if (pReader->pSchema != NULL) {
    return pReader->pSchema;
  }

  pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
X
Xiaoyu Wang 已提交
1841
  if (pReader->pSchema == NULL) {
1842 1843 1844
    tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
  }

X
Xiaoyu Wang 已提交
1845
  return pReader->pSchema;
1846 1847
}

H
Haojun Liao 已提交
1848 1849 1850
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1851
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1852 1853
  }

1854
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1855 1856 1857 1858 1859 1860
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1861 1862 1863 1864 1865 1866
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1867 1868 1869 1870 1871 1872
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1873
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1874
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1875
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1876 1877
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1878 1879
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1880
  }
H
Haojun Liao 已提交
1881 1882
}

1883
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1884 1885
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
1886
  SRow*               pTSRow = NULL;
1887 1888 1889
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1890
  int64_t tsLast = INT64_MIN;
1891
  if (hasDataInLastBlock(pLastBlockReader)) {
1892 1893
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1894

H
Hongze Cheng 已提交
1895 1896
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1897

1898 1899
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1900
    minKey = INT64_MAX;  // chosen the minimum value
1901
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1902 1903
      minKey = tsLast;
    }
1904

1905 1906 1907
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1908

1909
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1910 1911 1912 1913
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1914
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1915 1916 1917 1918 1919 1920 1921
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1922
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1923 1924
      minKey = key;
    }
1925 1926 1927 1928
  }

  bool init = false;

1929
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1930
  // DESC: mem -----> imem -----> last block -----> file block
1931 1932
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1933
      init = true;
H
Hongze Cheng 已提交
1934
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1935 1936 1937
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1938
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1939 1940
    }

1941
    if (minKey == tsLast) {
1942
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1943
      if (init) {
H
Hongze Cheng 已提交
1944
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1945
      } else {
1946
        init = true;
H
Hongze Cheng 已提交
1947
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1948 1949 1950
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1951
      }
1952
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1953
    }
1954

1955
    if (minKey == k.ts) {
K
kailixu 已提交
1956 1957 1958 1959
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1960
      if (init) {
X
Xiaoyu Wang 已提交
1961
        tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1962
      } else {
1963
        init = true;
X
Xiaoyu Wang 已提交
1964
        int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1965 1966 1967 1968 1969 1970 1971
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1972 1973 1974 1975 1976
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1977
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
1978
      int32_t   code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1979 1980 1981 1982 1983
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1984
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1985 1986
        return code;
      }
1987 1988
    }

1989
    if (minKey == tsLast) {
1990
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1991
      if (init) {
H
Hongze Cheng 已提交
1992
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1993
      } else {
1994
        init = true;
H
Hongze Cheng 已提交
1995
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1996 1997 1998
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1999
      }
2000
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2001 2002 2003
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2004
      if (init) {
H
Hongze Cheng 已提交
2005
        tsdbRowMerge(&merge, &fRow);
H
Haojun Liao 已提交
2006
      } else {
2007
        init = true;
H
Hongze Cheng 已提交
2008
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2009 2010 2011
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2012 2013 2014
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
2015 2016
  }

H
Hongze Cheng 已提交
2017
  int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
2018 2019 2020 2021
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2022
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2023 2024

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2025
  tsdbRowMergerClear(&merge);
2026 2027 2028
  return TSDB_CODE_SUCCESS;
}

2029 2030 2031
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
2032
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
2033
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
2034

H
Hongze Cheng 已提交
2035
  SRow*      pTSRow = NULL;
2036
  SRowMerger merge = {0};
2037
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2038
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
2039

2040 2041 2042
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
2043
      pBlockScanInfo->lastKey = tsLastBlock;
2044 2045
      return TSDB_CODE_SUCCESS;
    } else {
H
Hongze Cheng 已提交
2046
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2047 2048 2049
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2050

2051
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2052
      tsdbRowMerge(&merge, &fRow1);
2053
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2054

H
Hongze Cheng 已提交
2055
      code = tsdbRowMergerGetRow(&merge, &pTSRow);
2056 2057 2058
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2059

2060
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2061 2062

      taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2063
      tsdbRowMergerClear(&merge);
2064 2065
    }
  } else {  // not merge block data
H
Hongze Cheng 已提交
2066
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2067 2068 2069 2070
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2071
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2072 2073

    // merge with block data if ts == key
H
Haojun Liao 已提交
2074
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
2075 2076 2077
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Hongze Cheng 已提交
2078
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2079 2080 2081 2082
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2083
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2084 2085

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2086
    tsdbRowMergerClear(&merge);
2087
  }
2088 2089 2090 2091

  return TSDB_CODE_SUCCESS;
}

2092 2093
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
2094 2095
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2096
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
2097
    // no last block available, only data block exists
2098
    if (!hasDataInLastBlock(pLastBlockReader)) {
2099 2100 2101 2102 2103 2104 2105 2106 2107
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
2108
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
2109 2110
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
H
Hongze Cheng 已提交
2111
        SRow*      pTSRow = NULL;
2112
        SRowMerger merge = {0};
2113

H
Hongze Cheng 已提交
2114
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2115 2116 2117 2118
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2119
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2120 2121

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2122
        tsdbRowMerge(&merge, &fRow1);
2123

2124
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
2125

H
Hongze Cheng 已提交
2126
        code = tsdbRowMergerGetRow(&merge, &pTSRow);
2127 2128 2129 2130
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2131
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2132

2133
        taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2134
        tsdbRowMergerClear(&merge);
2135
        return code;
2136
      } else {
2137
        return TSDB_CODE_SUCCESS;
2138
      }
2139
    } else {  // desc order
2140
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
2141
    }
2142
  } else {  // only last block exists
2143
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2144
  }
2145 2146
}

2147 2148
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2149
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
2150
  SRow*               pTSRow = NULL;
H
Haojun Liao 已提交
2151
  int32_t             code = TSDB_CODE_SUCCESS;
2152 2153 2154
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2155 2156
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2157

2158
  int64_t tsLast = INT64_MIN;
2159 2160 2161
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2162

H
Hongze Cheng 已提交
2163
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2164 2165 2166 2167

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2168
  int64_t minKey = 0;
2169 2170 2171 2172 2173
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2174

2175 2176 2177
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2178

2179
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2180 2181
      minKey = key;
    }
2182

2183
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2184 2185 2186
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2187
    minKey = INT64_MIN;  // let find the maximum ts value
2188 2189 2190 2191 2192 2193 2194 2195
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2196
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2197 2198 2199
      minKey = key;
    }

2200
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2201 2202
      minKey = tsLast;
    }
2203 2204 2205 2206
  }

  bool init = false;

2207 2208 2209 2210
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2211
      init = true;
2212
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Hongze Cheng 已提交
2213
      code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2214 2215 2216 2217
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2218
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2219 2220
    }

2221
    if (minKey == tsLast) {
2222
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2223
      if (init) {
H
Hongze Cheng 已提交
2224
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2225
      } else {
2226
        init = true;
H
Hongze Cheng 已提交
2227
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2228 2229 2230
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2231
      }
H
Haojun Liao 已提交
2232

2233
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2234 2235 2236
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2237
      if (init) {
H
Hongze Cheng 已提交
2238
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2239
      } else {
2240 2241
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2242 2243 2244 2245
        if (pSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2246
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2247 2248 2249
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2250
      }
H
Haojun Liao 已提交
2251

2252 2253
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2254 2255
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2256
      }
2257 2258
    }

2259
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2260
      if (init) {
2261 2262 2263 2264
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2265
        tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
2266
      } else {
2267
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2268
        code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2269 2270 2271
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2272
      }
H
Haojun Liao 已提交
2273 2274 2275 2276
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2277 2278 2279 2280 2281
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2282
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2283
      code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2284 2285 2286 2287
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2288 2289 2290 2291 2292
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2293 2294 2295
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2296
      if (init) {
H
Hongze Cheng 已提交
2297
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2298
      } else {
2299 2300
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2301
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2302 2303 2304
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2305
      }
H
Haojun Liao 已提交
2306 2307 2308 2309
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2310 2311 2312 2313
      }
    }

    if (minKey == tsLast) {
2314
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2315
      if (init) {
H
Hongze Cheng 已提交
2316
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2317
      } else {
2318
        init = true;
H
Hongze Cheng 已提交
2319
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2320 2321 2322
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2323
      }
2324
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2325 2326 2327
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2328
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2329
      if (!init) {
H
Hongze Cheng 已提交
2330
        code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2331 2332 2333
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2334
      } else {
2335 2336 2337
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Hongze Cheng 已提交
2338
        tsdbRowMerge(&merge, &fRow);
2339 2340
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2341 2342 2343
    }
  }

2344
  if (merge.pTSchema == NULL) {
2345 2346 2347
    return code;
  }

H
Hongze Cheng 已提交
2348
  code = tsdbRowMergerGetRow(&merge, &pTSRow);
2349 2350 2351 2352
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2353
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2354 2355

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2356
  tsdbRowMergerClear(&merge);
2357
  return code;
2358 2359
}

2360 2361 2362 2363 2364 2365 2366 2367 2368
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
2369 2370
    // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
2371
  } else {
2372 2373
    // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
2374 2375 2376
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
D
dapan1121 已提交
2377
  int64_t st = 0;
2378 2379 2380 2381 2382 2383 2384 2385 2386

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2387
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2388
                  "-%" PRId64 " %s",
2389 2390
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2391
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2392 2393 2394 2395 2396
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2397
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2408
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2409
                  "-%" PRId64 " %s",
2410 2411
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2412
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2413 2414 2415 2416 2417
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2418
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2419 2420
  }

2421
  st = taosGetTimestampUs();
2422
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
2423
  pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
2424 2425 2426 2427 2428

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2429 2430
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2431 2432 2433 2434 2435 2436 2437 2438
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2450
  TSDBKEY k = {.ts = ts, .version = ver};
2451 2452
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2453 2454 2455
    return false;
  }

2456 2457 2458
  return true;
}

2459
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2460
  // the last block reader has been initialized for this table.
2461
  if (pLBlockReader->uid == pScanInfo->uid) {
2462
    return hasDataInLastBlock(pLBlockReader);
2463 2464
  }

2465 2466
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2467 2468
  }

2469 2470
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2471

H
Hongze Cheng 已提交
2472
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2473 2474 2475
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2476
  } else {
2477
    w.ekey = pScanInfo->lastKey + step;
2478 2479
  }

X
Xiaoyu Wang 已提交
2480 2481
  tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
            pScanInfo->uid, pReader->idStr);
2482 2483 2484
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2485 2486 2487 2488
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2489
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2490 2491
}

2492
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2493
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2494
  return TSDBROW_TS(&row);
2495 2496
}

H
Hongze Cheng 已提交
2497
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2498

2499
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2500
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
2501
    return false;  // this is an invalid result.
2502
  }
2503
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2504
}
2505

2506 2507
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2508 2509
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2510
    pBlockScanInfo->lastKey = key;
2511 2512
    return TSDB_CODE_SUCCESS;
  } else {
C
Cary Xu 已提交
2513 2514
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

H
Hongze Cheng 已提交
2515
    SRow*      pTSRow = NULL;
2516 2517
    SRowMerger merge = {0};

H
Hongze Cheng 已提交
2518
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2519 2520 2521 2522
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2523
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Hongze Cheng 已提交
2524
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2525 2526 2527 2528
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2529
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2530 2531

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2532
    tsdbRowMergerClear(&merge);
2533 2534 2535 2536
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2537 2538
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2539 2540
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2541
  TSDBROW *pRow = NULL, *piRow = NULL;
2542
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2543 2544 2545
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2546

2547 2548 2549
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2550

2551 2552 2553 2554
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2555

2556 2557 2558 2559
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2560

2561 2562 2563
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2564
  }
2565 2566 2567

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2568 2569
}

H
Haojun Liao 已提交
2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2623
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2624 2625
  int32_t code = TSDB_CODE_SUCCESS;

2626 2627
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2628
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
C
Cary Xu 已提交
2629 2630
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2631
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
2632
  int64_t st = taosGetTimestampUs();
2633
  int32_t step = asc ? 1 : -1;
2634
  double  el = 0;
2635 2636 2637

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
2638 2639
    pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
    if (pBlockScanInfo == NULL) {
H
Haojun Liao 已提交
2640 2641 2642
      goto _end;
    }

C
Cary Xu 已提交
2643
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2644
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
C
Cary Xu 已提交
2645 2646

    // it is a clean block, load it directly
H
Hongze Cheng 已提交
2647
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
2648
        pBlock->nRow <= pReader->capacity) {
2649
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
2650
        copyBlockDataToSDataBlock(pReader);
2651 2652

        // record the last key value
H
Hongze Cheng 已提交
2653
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2654 2655
        goto _end;
      }
C
Cary Xu 已提交
2656 2657
    }
  } else {  // file blocks not exist
2658
    pBlockScanInfo = *pReader->status.pTableIter;
2659 2660
  }

2661
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2662
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2663

2664
  while (1) {
2665
    bool hasBlockData = false;
2666
    {
2667 2668
      while (pBlockData->nRow > 0 &&
             pBlockData->uid == pBlockScanInfo->uid) {  // find the first qualified row in data block
2669 2670 2671 2672 2673
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2674 2675
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2676
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2677
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2678
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2679

H
Haojun Liao 已提交
2680 2681 2682 2683 2684
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2685 2686
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2687
          }
2688 2689
        }
      }
2690
    }
2691

2692
    // no data in last block and block, no need to proceed.
2693
    if (hasBlockData == false) {
2694
      break;
2695 2696
    }

2697
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2698

2699
    // currently loaded file data block is consumed
2700
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2701
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2702
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2703 2704 2705 2706 2707
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2708 2709 2710
    }
  }

H
Hongze Cheng 已提交
2711
_end:
2712 2713
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2714

2715 2716 2717
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2718
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2719
              pResBlock->info.rows, el, pReader->idStr);
2720
  }
2721

H
Haojun Liao 已提交
2722
  return code;
2723 2724 2725 2726
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2727 2728 2729 2730 2731 2732 2733 2734
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2735 2736
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2737 2738 2739
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2740

2741
  int32_t code = 0;
2742 2743
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2744
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2745
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2746
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2747
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2748

H
Haojun Liao 已提交
2749
    if (pIdx != NULL) {
2750
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2751 2752 2753
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2754
    }
2755
  }
2756

2757 2758 2759 2760 2761 2762 2763
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2764 2765
  }

2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2780 2781 2782 2783 2784 2785 2786
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2787 2788
  return code;

2789 2790 2791
_err:
  taosArrayDestroy(pDelData);
  return code;
2792 2793
}

C
Cary Xu 已提交
2794
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2795
  bool asc = ASCENDING_TRAVERSE(pReader->order);
X
Xiaoyu Wang 已提交
2796
  //  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
2797

X
Xiaoyu Wang 已提交
2798
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
2799

X
Xiaoyu Wang 已提交
2800
  bool     hasKey = false, hasIKey = false;
2801
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2802
  if (pRow != NULL) {
2803
    hasKey = true;
2804 2805 2806
    key = TSDBROW_KEY(pRow);
  }

2807 2808 2809 2810
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
2811 2812
  }

2813
  if (hasKey) {
X
Xiaoyu Wang 已提交
2814
    if (hasIKey) {  // has data in mem & imem
2815 2816
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
X
Xiaoyu Wang 已提交
2817 2818
      } else {
        return key.ts <= ikey.ts ? ikey : key;
2819 2820 2821
      }
    } else {  // no data in imem
      return key;
2822
    }
2823 2824 2825 2826
  } else {
    // no data in mem & imem, return the initial value
    // only imem has data, return ikey
    return ikey;
2827 2828 2829
  }
}

2830
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2831
  SReaderStatus* pStatus = &pReader->status;
2832
  pBlockNum->numOfBlocks = 0;
2833
  pBlockNum->numOfLastFiles = 0;
2834

2835
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2836
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2837 2838

  while (1) {
dengyihao's avatar
dengyihao 已提交
2839
    bool    hasNext = false;
D
dapan1121 已提交
2840 2841 2842 2843 2844
    int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
    if (code) {
      taosArrayDestroy(pIndexList);
      return code;
    }
dengyihao's avatar
dengyihao 已提交
2845

2846
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2847 2848 2849
      break;
    }

H
Haojun Liao 已提交
2850
    taosArrayClear(pIndexList);
D
dapan1121 已提交
2851
    code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
H
Haojun Liao 已提交
2852
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2853
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2854 2855 2856
      return code;
    }

H
Hongze Cheng 已提交
2857
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2858
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2859
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2860
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2861 2862 2863
        return code;
      }

2864
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2865 2866 2867
        break;
      }
    }
2868

H
Haojun Liao 已提交
2869 2870 2871
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2872
  taosArrayDestroy(pIndexList);
2873

H
Haojun Liao 已提交
2874 2875 2876 2877 2878 2879 2880
  if (pReader->pReadSnap != NULL) {
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2881

H
Haojun Liao 已提交
2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2893 2894 2895
    }
  }

H
Haojun Liao 已提交
2896 2897 2898
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2899
static void resetTableListIndex(SReaderStatus* pStatus) {
2900
  STableUidList* pList = &pStatus->uidList;
2901

H
Haojun Liao 已提交
2902 2903 2904
  pList->currentIndex = 0;
  uint64_t uid = pList->tableUidList[0];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2905 2906
}

2907
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
2908 2909 2910 2911 2912 2913 2914 2915
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2916
  return (pStatus->pTableIter != NULL);
2917 2918
}

2919
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2920
  SReaderStatus*    pStatus = &pReader->status;
2921
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
2922
  STableUidList*    pUidList = &pStatus->uidList;
2923

H
Haojun Liao 已提交
2924 2925
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
    return TSDB_CODE_SUCCESS;
2926
  }
2927

2928 2929
  SSDataBlock* pResBlock = pReader->pResBlock;

2930
  while (1) {
2931
    // load the last data block of current table
H
Hongze Cheng 已提交
2932
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2933 2934

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2935
    if (!hasVal) {
2936
      bool hasNexTable = moveToNextTable(pUidList, pStatus);
2937
      if (!hasNexTable) {
2938 2939
        return TSDB_CODE_SUCCESS;
      }
2940

2941
      continue;
2942 2943
    }

2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2957 2958
    }

2959 2960 2961 2962 2963 2964 2965 2966
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2967 2968
      return TSDB_CODE_SUCCESS;
    }
2969

2970
    // current table is exhausted, let's try next table
2971
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
2972
    if (!hasNexTable) {
2973 2974
      return TSDB_CODE_SUCCESS;
    }
2975 2976 2977
  }
}

2978
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2979 2980
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2981 2982 2983

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2984 2985 2986
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2987

H
Haojun Liao 已提交
2988
  pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2989
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2990
    return terrno;
H
Haojun Liao 已提交
2991 2992
  }

2993
  pBlock = getCurrentBlock(pBlockIter);
2994

2995
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
C
Cary Xu 已提交
2996
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2997

2998
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2999
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
3000 3001
    if (code != TSDB_CODE_SUCCESS) {
      return code;
3002 3003 3004
    }

    // build composed data block
3005
    code = buildComposedDataBlock(pReader);
C
Cary Xu 已提交
3006
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
3007
    // data in memory that are earlier than current file block
3008
    // rows in buffer should be less than the file block in asc, greater than file block in desc
3009
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
3010
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
3011 3012 3013 3014
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
3015
      ASSERT(tsLast >= pBlock->maxKey.ts);
3016

3017 3018 3019 3020
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
3021
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
X
Xiaoyu Wang 已提交
3044
                  " rows:%d, elapsed time:%.2f ms %s",
3045 3046 3047
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
3048
    } else {  // whole block is required, return it directly
3049 3050
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
3051
      pInfo->id.uid = pScanInfo->uid;
3052
      pInfo->dataLoad = 0;
3053 3054 3055
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
3056

3057
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
3058
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
X
Xiaoyu Wang 已提交
3059 3060
      tsdbDebug("%p uid:%" PRIu64
                " clean file block retrieved from file, global index:%d, "
H
Haojun Liao 已提交
3061 3062 3063
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
3064
    }
3065 3066 3067 3068 3069
  }

  return code;
}

H
Haojun Liao 已提交
3070
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
3071
  SReaderStatus* pStatus = &pReader->status;
3072
  STableUidList* pUidList = &pStatus->uidList;
3073

3074
  while (1) {
X
Xiaoyu Wang 已提交
3075 3076 3077 3078 3079 3080
    //    if (pStatus->pTableIter == NULL) {
    //      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
    //      if (pStatus->pTableIter == NULL) {
    //        return TSDB_CODE_SUCCESS;
    //      }
    //    }
3081

3082 3083
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
3084

3085
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
3086
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
3087 3088 3089 3090
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3091
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
3092
      return TSDB_CODE_SUCCESS;
3093 3094
    }

3095 3096 3097
    // current table is exhausted, let's try next table
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
    if (!hasNexTable) {
H
Haojun Liao 已提交
3098
      return TSDB_CODE_SUCCESS;
3099 3100 3101 3102
    }
  }
}

3103
// set the correct start position in case of the first/last file block, according to the query time window
3104
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3105 3106 3107 3108 3109 3110 3111 3112
  int64_t             lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (pScanInfo) {
      lastKey = pScanInfo->lastKey;
    }
3113
  }
3114 3115 3116
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
3117 3118 3119

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3120
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3121
  pDumpInfo->lastKey = lastKey;
3122 3123
}

3124
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3125
  SBlockNumber num = {0};
X
Xiaoyu Wang 已提交
3126
  int32_t      code = moveToNextFile(pReader, &num);
3127 3128 3129 3130 3131
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3132
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3133 3134 3135 3136 3137
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3138 3139
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3140
  } else {  // no block data, only last block exists
3141
    tBlockDataReset(&pReader->status.fileBlockData);
3142
    resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3143
    resetTableListIndex(&pReader->status);
3144
  }
3145 3146

  // set the correct start position according to the query time window
3147
  initBlockDumpInfo(pReader, pBlockIter);
3148 3149 3150
  return code;
}

3151
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3152 3153
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3154 3155
}

3156
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3157
  int32_t code = TSDB_CODE_SUCCESS;
3158 3159
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3160 3161
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3162
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
3163
  _begin:
3164 3165 3166 3167 3168
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3169 3170 3171 3172
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3173
    // all data blocks are checked in this last block file, now let's try the next file
3174 3175 3176 3177 3178 3179 3180 3181
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3182
      // this file does not have data files, let's start check the last block file if exists
3183
      if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3184
        resetTableListIndex(&pReader->status);
3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3199
  while (1) {
3200 3201
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3202
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3203
      code = buildComposedDataBlock(pReader);
3204 3205 3206 3207
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3208
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3209 3210
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3211
        } else {
H
Haojun Liao 已提交
3212
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3213
            // data blocks in current file are exhausted, let's try the next file now
H
Haojun Liao 已提交
3214 3215 3216 3217 3218 3219
            SBlockData* pBlockData = &pReader->status.fileBlockData;
            if (pBlockData->uid != 0) {
              tBlockDataClear(pBlockData);
            }

            tBlockDataReset(pBlockData);
3220
            resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3221
            resetTableListIndex(&pReader->status);
3222 3223 3224
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3225

3226 3227 3228 3229
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3230

3231 3232
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3233
              resetTableListIndex(&pReader->status);
3234 3235
              goto _begin;
            }
3236
          }
3237
        }
H
Haojun Liao 已提交
3238
      }
3239 3240

      code = doBuildDataBlock(pReader);
3241 3242
    }

3243 3244 3245 3246 3247 3248 3249 3250
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3251
}
H
refact  
Hongze Cheng 已提交
3252

3253 3254
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3255
  if (VND_IS_RSMA(pVnode)) {
3256
    int8_t  level = 0;
3257 3258
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3259 3260 3261
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3262

3263
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3264 3265 3266 3267 3268 3269 3270
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3271
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3272 3273 3274 3275 3276
        break;
      }
      ++level;
    }

3277
    const char* str = (idStr != NULL) ? idStr : "";
3278 3279

    if (level == TSDB_RETENTION_L0) {
3280
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3281
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3282 3283
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3284
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3285
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3286 3287
      return VND_RSMA1(pVnode);
    } else {
3288
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3289
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3290 3291 3292 3293 3294 3295 3296
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3297
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3298
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3299 3300

  int64_t endVer = 0;
L
Liu Jicong 已提交
3301 3302
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3303 3304
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3305
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3306 3307
  }

H
Haojun Liao 已提交
3308
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3309 3310
}

3311
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3312 3313 3314
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3315

L
Liu Jicong 已提交
3316 3317 3318
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3319

3320 3321 3322 3323 3324 3325
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3326
        return false;
3327 3328
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
3329 3330
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
                prev->version >= pVerRange->minVer);
3331 3332
      }
    } else {
3333 3334 3335 3336 3337 3338 3339
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3340 3341
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3357 3358
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3359 3360 3361 3362 3363 3364
            return true;
          }
        }
      }

      return false;
3365 3366
    }
  } else {
3367 3368
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3369

3370 3371 3372 3373 3374 3375 3376
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3377
    } else {
3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3405 3406 3407 3408 3409
      }

      return false;
    }
  }
3410 3411

  return false;
3412 3413
}

3414
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3415
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3416 3417
    return NULL;
  }
H
Hongze Cheng 已提交
3418

3419
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3420 3421
  TSDBKEY  key = TSDBROW_KEY(pRow);

3422
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3423
    pIter->hasVal = false;
H
Haojun Liao 已提交
3424 3425
    return NULL;
  }
H
Hongze Cheng 已提交
3426

3427
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3428
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3429
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3430 3431
    return pRow;
  }
H
Hongze Cheng 已提交
3432

3433
  while (1) {
3434 3435
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3436 3437
      return NULL;
    }
H
Hongze Cheng 已提交
3438

3439
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3440

H
Haojun Liao 已提交
3441
    key = TSDBROW_KEY(pRow);
3442
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3443
      pIter->hasVal = false;
H
Haojun Liao 已提交
3444 3445
      return NULL;
    }
H
Hongze Cheng 已提交
3446

dengyihao's avatar
dengyihao 已提交
3447
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3448
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3449 3450 3451 3452
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3453

3454 3455
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3456
  while (1) {
3457 3458
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3459 3460
      break;
    }
H
Hongze Cheng 已提交
3461

3462
    // data exists but not valid
3463
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3464 3465 3466 3467 3468
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3469
    TSDBKEY k = TSDBROW_KEY(pRow);
3470
    if (k.ts != ts) {
H
Haojun Liao 已提交
3471 3472 3473
      break;
    }

3474 3475 3476 3477 3478
    if (pRow->type == TSDBROW_ROW_FMT) {
      STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
      if (pTSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
3479

3480 3481 3482 3483
      tsdbRowMergerAdd(pMerger, pRow, pTSchema);
    } else {  // column format
      tsdbRowMerge(pMerger, pRow);
    }
H
Haojun Liao 已提交
3484 3485 3486 3487 3488
  }

  return TSDB_CODE_SUCCESS;
}

3489
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3490
                                          SVersionRange* pVerRange, int32_t step) {
3491
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3492
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3493
      rowIndex += step;
3494 3495 3496 3497
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
H
Hongze Cheng 已提交
3498
    tsdbRowMerge(pMerger, &fRow);
3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3510
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3511 3512
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3513
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3514
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3515

3516
  *state = CHECK_FILEBLOCK_QUIT;
3517
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3518

3519
  bool    loadNeighbor = true;
H
Haojun Liao 已提交
3520
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3521

H
Haojun Liao 已提交
3522
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3523 3524
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3525
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3526 3527 3528 3529
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3530
  return code;
3531 3532
}

3533 3534
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3535 3536
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3537
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3538
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3539
  int32_t step = asc ? 1 : -1;
3540

3541
  pDumpInfo->rowIndex += step;
3542
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3543 3544 3545
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3546

3547 3548 3549 3550
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3551

3552
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3553
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
H
Haojun Liao 已提交
3554 3555 3556 3557 3558
      if (pFileBlockInfo == NULL) {
        st = CHECK_FILEBLOCK_QUIT;
        break;
      }

3559 3560 3561
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3562
      }
3563
    }
H
Haojun Liao 已提交
3564
  }
3565

H
Haojun Liao 已提交
3566 3567 3568
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3569
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3570 3571
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3572 3573
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3574
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
3575
      tsdbRowMerge(pMerger, &fRow1);
3576 3577 3578 3579 3580 3581 3582 3583
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3584
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
3585
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3586
  TSDBROW* pNextRow = NULL;
3587
  TSDBROW  current = *pRow;
3588

3589 3590
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3591

3592
    if (!pIter->hasVal) {
3593
      *pResRow = *pRow;
3594
      *freeTSRow = false;
3595
      return TSDB_CODE_SUCCESS;
3596
    } else {  // has next point in mem/imem
3597
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3598
      if (pNextRow == NULL) {
H
Haojun Liao 已提交
3599
        *pResRow = current;
3600
        *freeTSRow = false;
3601
        return TSDB_CODE_SUCCESS;
3602 3603
      }

H
Hongze Cheng 已提交
3604
      if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
H
Haojun Liao 已提交
3605
        *pResRow = current;
3606
        *freeTSRow = false;
3607
        return TSDB_CODE_SUCCESS;
3608
      }
3609
    }
3610 3611
  }

3612
  SRowMerger merge = {0};
H
Haojun Liao 已提交
3613
  terrno = 0;
3614
  int32_t code = 0;
H
Haojun Liao 已提交
3615

3616 3617 3618 3619 3620 3621 3622
  // start to merge duplicated rows
  if (current.type == TSDBROW_ROW_FMT) {
    // get the correct schema for data in memory
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
    if (pTSchema == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3623

3624 3625 3626
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
H
Haojun Liao 已提交
3627

3628
    code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
3629 3630 3631
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3632

3633 3634 3635 3636
    STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
    if (pTSchema1 == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3637

3638
    tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
3639 3640
  } else {  // let's merge rows in file block
    code = tsdbRowMergerInit(&merge, &current, pReader->pSchema);
3641 3642 3643
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3644

3645 3646
    tsdbRowMerge(&merge, pNextRow);
  }
H
Haojun Liao 已提交
3647

wmmhello's avatar
wmmhello 已提交
3648
  code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
H
Haojun Liao 已提交
3649 3650 3651 3652
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3653
  code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
3654 3655 3656
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3657

wmmhello's avatar
wmmhello 已提交
3658
  pResRow->type = TSDBROW_ROW_FMT;
3659
  tsdbRowMergerClear(&merge);
3660
  *freeTSRow = true;
3661

3662
  return TSDB_CODE_SUCCESS;
3663 3664
}

3665
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3666
                           SRow** pTSRow) {
H
Haojun Liao 已提交
3667 3668
  SRowMerger merge = {0};

3669 3670 3671
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3672
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3673
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3674

H
Hongze Cheng 已提交
3675
    int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
3676 3677 3678 3679 3680 3681 3682 3683 3684
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3685

H
Hongze Cheng 已提交
3686
    tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3687 3688 3689 3690 3691 3692
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3693
  } else {
H
Haojun Liao 已提交
3694
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3695

H
Hongze Cheng 已提交
3696
    int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
3697
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3698 3699 3700 3701 3702 3703 3704 3705
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3706

H
Hongze Cheng 已提交
3707
    tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3708 3709 3710 3711 3712
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3713
  }
3714

H
Haojun Liao 已提交
3715 3716
  int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
  tsdbRowMergerClear(&merge);
3717
  return code;
3718 3719
}

3720
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
3721
                            bool* freeTSRow) {
3722 3723
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3724
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3725
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3726

3727 3728
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3729
  if (pBlockScanInfo->iter.hasVal) {
3730 3731 3732 3733 3734 3735
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3736
  if (pBlockScanInfo->iiter.hasVal) {
3737 3738 3739 3740 3741 3742
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3743
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3744
    TSDBKEY k = TSDBROW_KEY(pRow);
3745
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3746

3747
    int32_t code = TSDB_CODE_SUCCESS;
3748 3749
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3750
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
3751
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3752
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
3753
      }
3754
    } else {  // ik.ts == k.ts
3755
      *freeTSRow = true;
3756 3757
      pResRow->type = TSDBROW_ROW_FMT;
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
3758 3759 3760
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3761
    }
3762

3763
    return code;
H
Haojun Liao 已提交
3764 3765
  }

3766
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3767
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
H
Hongze Cheng 已提交
3768
                                    freeTSRow);
H
Haojun Liao 已提交
3769 3770
  }

3771
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3772
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3773 3774 3775 3776 3777
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3778
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3779
  int32_t outputRowIndex = pBlock->info.rows;
3780
  int64_t uid = pScanInfo->uid;
3781 3782 3783

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3784
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3785
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3786

3787
  SColVal colVal = {0};
3788
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3789

3790
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3791
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3792
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3793 3794 3795
    i += 1;
  }

H
Haojun Liao 已提交
3796
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3797
    col_id_t colId = pSupInfo->colId[i];
3798 3799

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3800
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3801

H
Hongze Cheng 已提交
3802
      tRowGet(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3803
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3804 3805 3806
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3807
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3808

3809
      colDataSetNULL(pColInfoData, outputRowIndex);
3810 3811 3812
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3813
    }
3814 3815
  }

3816
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3817
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3818
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
3819
    colDataSetNULL(pColInfoData, outputRowIndex);
3820 3821 3822
    i += 1;
  }

3823
  pBlock->info.dataLoad = 1;
3824
  pBlock->info.rows += 1;
3825
  pScanInfo->lastKey = pTSRow->ts;
3826 3827 3828
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3829 3830
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3831 3832 3833 3834
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3835
  if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3836
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3837
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3838
    i += 1;
3839 3840 3841
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3842
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3843
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3844

3845
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3846
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3847
    if (pData->cid < pSupInfo->colId[i]) {
3848 3849 3850 3851
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3852 3853
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3854 3855
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3856
      j += 1;
H
Haojun Liao 已提交
3857 3858
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3859
      colDataSetNULL(pCol, outputRowIndex);
3860 3861 3862 3863 3864 3865
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3866
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3867
    colDataSetNULL(pCol, outputRowIndex);
3868 3869 3870
    i += 1;
  }

3871
  pResBlock->info.dataLoad = 1;
3872 3873 3874 3875
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3876 3877
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3878 3879 3880
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
3881
    //    SRow* pTSRow = NULL;
3882
    TSDBROW row = {.type = -1};
3883
    bool    freeTSRow = false;
3884 3885
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
    if (row.type == -1) {
3886
      break;
H
Haojun Liao 已提交
3887 3888
    }

3889 3890
    if (row.type == TSDBROW_ROW_FMT) {
      doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
3891

3892 3893 3894 3895 3896
      if (freeTSRow) {
        taosMemoryFree(row.pTSRow);
      }
    } else {
      doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
3897
    }
H
Haojun Liao 已提交
3898 3899

    // no data in buffer, return immediately
3900
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3901 3902 3903
      break;
    }

3904
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3905 3906 3907 3908 3909 3910
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3911

3912 3913
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3914
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3915

3916
  STableBlockScanInfo** p = NULL;
3917
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3918
    clearBlockScanInfo(*p);
3919 3920
  }

D
dapan1121 已提交
3921 3922 3923 3924 3925
  if (size < num) {
    int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
    if (code) {
      return code;
    }
D
dapan1121 已提交
3926
    pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
D
dapan1121 已提交
3927
  }
3928

3929
  taosHashClear(pReader->status.pTableMap);
3930
  STableUidList* pUidList = &pReader->status.uidList;
H
Haojun Liao 已提交
3931
  pUidList->currentIndex = 0;
3932

3933 3934
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3935 3936
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
H
Haojun Liao 已提交
3937 3938
    pUidList->tableUidList[i] = pList[i].uid;

3939
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3940 3941
  }

H
Hongze Cheng 已提交
3942 3943 3944
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3945 3946 3947 3948 3949 3950
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3951

dengyihao's avatar
dengyihao 已提交
3952 3953 3954 3955 3956 3957
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3958

H
Hongze Cheng 已提交
3959
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3960

3961
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
3962 3963
  SReaderStatus*  pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
3964

3965 3966
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pStatus->blockIter, pReader->order);
3967

3968 3969 3970
  int32_t code = TSDB_CODE_SUCCESS;
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
3971
  } else {
3972
    code = initForFirstBlockInFile(pReader, pBlockIter);
3973
  }
3974 3975 3976

  if (!pStatus->loadFromFile) {
    resetTableListIndex(pStatus);
3977
  }
3978 3979

  return code;
3980 3981
}

H
refact  
Hongze Cheng 已提交
3982
// ====================================== EXPOSED APIs ======================================
3983
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3984
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3985 3986 3987 3988 3989 3990
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3991 3992 3993
  int32_t capacity = pVnode->config.tsdbCfg.maxRows;
  if (pResBlock != NULL) {
    blockDataEnsureCapacity(pResBlock, capacity);
H
Haojun Liao 已提交
3994 3995 3996
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3997
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3998 3999
    goto _err;
  }
H
Hongze Cheng 已提交
4000

4001
  // check for query time window
H
Haojun Liao 已提交
4002
  STsdbReader* pReader = *ppReader;
4003
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
4004 4005 4006
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4007

4008 4009
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
4010
    int32_t order = pCond->order;
4011
    if (order == TSDB_ORDER_ASC) {
4012
      pCond->twindows.ekey = window.skey;
4013 4014 4015
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
4016
      pCond->twindows.skey = window.ekey;
4017 4018 4019 4020
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

4021
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
4022
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
4023 4024 4025 4026 4027
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
4028
      pCond->twindows.skey = window.ekey;
4029
      pCond->twindows.ekey = INT64_MAX;
4030
    } else {
4031
      pCond->twindows.skey = INT64_MIN;
4032
      pCond->twindows.ekey = window.ekey;
4033
    }
4034 4035
    pCond->order = order;

H
Haojun Liao 已提交
4036
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
4037 4038 4039 4040 4041
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
4042
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
4043 4044
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
4045
  if (pCond->suid != 0) {
4046
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
4047
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4048
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
4049
    }
4050 4051
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
4052
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
4053
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4054
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
4055
    }
4056 4057
  }

4058
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
4059 4060 4061 4062
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
4063
  }
4064

4065
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
X
Xiaoyu Wang 已提交
4066 4067
  pReader->status.pTableMap =
      createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
H
Haojun Liao 已提交
4068 4069
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
4070
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
4071 4072
    goto _err;
  }
H
Hongze Cheng 已提交
4073

4074
  pReader->suspended = true;
4075

4076
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
4077
  return code;
H
Hongze Cheng 已提交
4078 4079

_err:
H
Haojun Liao 已提交
4080
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
4081
  tsdbReaderClose(pReader);
X
Xiaoyu Wang 已提交
4082
  *ppReader = NULL;  // reset the pointer value.
H
Hongze Cheng 已提交
4083
  return code;
H
refact  
Hongze Cheng 已提交
4084 4085 4086
}

void tsdbReaderClose(STsdbReader* pReader) {
4087 4088
  if (pReader == NULL) {
    return;
4089
  }
H
refact  
Hongze Cheng 已提交
4090

4091
  tsdbAcquireReader(pReader);
4092
  {
H
Haojun Liao 已提交
4093
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
4094
      STsdbReader* p = pReader->innerReader[0];
4095

4096
      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4097
      p->status.uidList.tableUidList = NULL;
4098 4099 4100 4101 4102 4103 4104
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4105
      p->status.uidList.tableUidList = NULL;
4106 4107 4108
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
4109 4110 4111 4112 4113 4114

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

4115
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
4116

4117
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
4118
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
4119 4120 4121 4122
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
4123

H
Haojun Liao 已提交
4124 4125 4126
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
4127

H
Haojun Liao 已提交
4128
  taosMemoryFree(pSupInfo->colId);
H
Hongze Cheng 已提交
4129
  tBlockDataDestroy(&pReader->status.fileBlockData);
4130
  cleanupDataBlockIterator(&pReader->status.blockIter);
4131 4132

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
4133 4134 4135 4136
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }
4137

H
Haojun Liao 已提交
4138 4139 4140
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4141

4142 4143 4144 4145 4146 4147 4148 4149 4150
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

4151
  qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
4152
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
4153
  pReader->pReadSnap = NULL;
4154

4155 4156
  tsdbReleaseReader(pReader);

4157
  tsdbUninitReaderLock(pReader);
4158

4159
  taosMemoryFree(pReader->status.uidList.tableUidList);
H
Haojun Liao 已提交
4160
  SIOCostSummary* pCost = &pReader->cost;
4161

H
Haojun Liao 已提交
4162 4163
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4164 4165
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4166

H
Haojun Liao 已提交
4167
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
H
refact  
Hongze Cheng 已提交
4168

H
Haojun Liao 已提交
4169 4170 4171
    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4172

4173 4174 4175 4176 4177
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
X
Xiaoyu Wang 已提交
4178 4179
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
      "ms, %s",
4180 4181 4182
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
H
Haojun Liao 已提交
4183
      pCost->initDelSkylineIterTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4184

4185 4186
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4187

4188 4189 4190
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
4191

4192
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4193 4194
}

4195 4196 4197 4198 4199 4200 4201 4202 4203 4204
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
  int32_t code = 0;

  // save reader's base state & reset top state to be reconstructed from base state
  SReaderStatus*       pStatus = &pReader->status;
  STableBlockScanInfo* pBlockScanInfo = NULL;

  if (pStatus->loadFromFile) {
    SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
    if (pBlockInfo != NULL) {
4205 4206
      pBlockScanInfo =
          *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
4207 4208 4209 4210 4211 4212 4213
      if (pBlockScanInfo == NULL) {
        code = TSDB_CODE_INVALID_PARA;
        tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                  taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
        goto _err;
      }
    } else {
4214
      pBlockScanInfo = *pStatus->pTableIter;
4215 4216 4217 4218 4219
    }

    tsdbDataFReaderClose(&pReader->pFileReader);

    // resetDataBlockScanInfo excluding lastKey
4220
    STableBlockScanInfo** p = NULL;
4221 4222

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
4223 4224 4225 4226 4227 4228 4229 4230
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
4231 4232
      }

4233 4234 4235 4236 4237 4238
      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
4239 4240
    }
  } else {
4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262
    // resetDataBlockScanInfo excluding lastKey
    STableBlockScanInfo** p = NULL;

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
      }

      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
    }

4263
    pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
4264 4265 4266 4267 4268 4269 4270
    if (pBlockScanInfo) {
      // save lastKey to restore memory iterator
      STimeWindow w = pReader->pResBlock->info.window;
      pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;

      // reset current current table's data block scan info,
      pBlockScanInfo->iterInit = false;
4271 4272
      pBlockScanInfo->iter.hasVal = false;
      pBlockScanInfo->iiter.hasVal = false;
4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287
      if (pBlockScanInfo->iter.iter != NULL) {
        pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
      }

      if (pBlockScanInfo->iiter.iter != NULL) {
        pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
      }

      pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
      tMapDataClear(&pBlockScanInfo->mapData);
      // TODO: keep skyline for reuse
      pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
    }
  }

4288
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
4289
  pReader->pReadSnap = NULL;
4290 4291 4292

  pReader->suspended = true;

4293 4294
  tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
            pReader->idStr);
4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305
  return code;

_err:
  tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

static int32_t tsdbSetQueryReseek(void* pQHandle) {
  int32_t      code = 0;
  STsdbReader* pReader = pQHandle;

4306
  code = tsdbTryAcquireReader(pReader);
4307 4308
  if (code == 0) {
    if (pReader->suspended) {
4309
      tsdbReleaseReader(pReader);
4310 4311 4312 4313
      return code;
    }

    tsdbReaderSuspend(pReader);
4314

4315
    tsdbReleaseReader(pReader);
4316

4317
    return code;
4318 4319 4320
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
4321 4322
    terrno = TAOS_SYSTEM_ERROR(code);
    return TSDB_CODE_FAILED;
4323 4324 4325 4326 4327 4328
  }
}

int32_t tsdbReaderResume(STsdbReader* pReader) {
  int32_t code = 0;

4329
  STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
4330 4331 4332

  //  restore reader's state
  //  task snapshot
4333
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
4334
  if (numOfTables > 0) {
4335
    qTrace("tsdb/reader: %p, take snapshot", pReader);
4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352
    code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    } else {
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];

      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4353
      pPrevReader->status.uidList = pReader->status.uidList;
4354 4355 4356 4357 4358 4359
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4360
      pNextReader->status.uidList = pReader->status.uidList;
4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;

      code = doOpenReaderImpl(pPrevReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

  pReader->suspended = false;

4374 4375
  tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
            pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
4376 4377 4378 4379 4380 4381 4382
  return code;

_err:
  tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

4383
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4384
  // cleanup the data that belongs to the previous data block
4385 4386
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4387

4388
  SReaderStatus* pStatus = &pReader->status;
4389
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
4390 4391
    return false;
  }
H
Haojun Liao 已提交
4392

4393 4394 4395 4396 4397
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4398

4399 4400 4401
    if (pBlock->info.rows > 0) {
      return true;
    } else {
4402
      resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4403
      buildBlockFromBufferSequentially(pReader);
4404
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4405
    }
4406 4407 4408
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4409
  }
H
refact  
Hongze Cheng 已提交
4410 4411
}

4412
bool tsdbNextDataBlock(STsdbReader* pReader) {
4413
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
4414 4415 4416
    return false;
  }

4417 4418
  SReaderStatus* pStatus = &pReader->status;

4419 4420 4421
  int32_t code = tsdbAcquireReader(pReader);
  qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

4422 4423 4424 4425
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4426
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4427
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4428
    pReader->step = EXTERNAL_ROWS_PREV;
4429
    if (ret) {
4430
      pStatus = &pReader->innerReader[0]->status;
4431
      if (pStatus->composedDataBlock) {
4432
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4433
        tsdbReleaseReader(pReader);
4434 4435
      }

4436 4437
      return ret;
    }
4438
  }
4439

4440
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4441 4442
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4443
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4444 4445 4446 4447 4448

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4449
    pReader->step = EXTERNAL_ROWS_MAIN;
4450 4451 4452 4453
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
4454
    if (pStatus->composedDataBlock) {
4455
      qTrace("tsdb/read: %p, unlock read mutex", pReader);
4456
      tsdbReleaseReader(pReader);
4457 4458
    }

4459 4460 4461
    return ret;
  }

4462
  if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
4463 4464
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4465
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4466 4467 4468 4469
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4470
    ret = doTsdbNextDataBlock(pReader->innerReader[1]);
4471
    pReader->step = EXTERNAL_ROWS_NEXT;
4472
    if (ret) {
4473
      pStatus = &pReader->innerReader[1]->status;
4474
      if (pStatus->composedDataBlock) {
4475
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4476
        tsdbReleaseReader(pReader);
4477 4478
      }

4479
      return ret;
4480 4481 4482
    }
  }

4483
  qTrace("tsdb/read: %p, unlock read mutex", pReader);
4484
  tsdbReleaseReader(pReader);
4485

4486 4487 4488
  return false;
}

4489
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
4490 4491
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
4492
  int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
4493
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4505
        taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
4506 4507 4508 4509 4510 4511
      }
      j += 1;
    }
  }
}

4512
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4513 4514
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4515
  int32_t code = 0;
4516
  *allHave = false;
H
Haojun Liao 已提交
4517
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4518

4519
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4520 4521 4522
    return TSDB_CODE_SUCCESS;
  }

4523
  // there is no statistics data for composed block
4524
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4525 4526
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4527

4528
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4529 4530
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4531 4532 4533
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4534 4535

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4536
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4537
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4538
    if (code != TSDB_CODE_SUCCESS) {
4539 4540
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4541 4542
      return code;
    }
4543
  } else {
H
Haojun Liao 已提交
4544
    *pBlockSMA = NULL;
4545
    return TSDB_CODE_SUCCESS;
4546
  }
H
Hongze Cheng 已提交
4547

4548
  *allHave = true;
H
Hongze Cheng 已提交
4549

4550 4551
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4552

4553 4554
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4555 4556 4557 4558
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4559
  size_t numOfCols = pSup->numOfCols;
4560

4561
  // ensure capacity
H
Haojun Liao 已提交
4562 4563 4564
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4565 4566
  }

4567 4568 4569
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
H
Haojun Liao 已提交
4570
    pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
4571
  }
4572

4573
  // do fill all null column value SMA info
4574
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4575
  size_t size = taosArrayGetSize(pSup->pColAgg);
4576

H
Haojun Liao 已提交
4577
  int32_t i = 0, j = 0;
4578
  while (j < numOfCols && i < size) {
4579
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4580 4581
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4582 4583
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4584
    } else if (pAgg->colId < pSup->colId[j]) {
4585
      i += 1;
H
Haojun Liao 已提交
4586
    } else if (pSup->colId[j] < pAgg->colId) {
H
Haojun Liao 已提交
4587
      // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
4588
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4589 4590 4591 4592
      j += 1;
    }
  }

H
Haojun Liao 已提交
4593
  *pBlockSMA = pResBlock->pBlockAgg;
4594
  pReader->cost.smaDataLoad += 1;
4595

4596
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4597
  return code;
H
Hongze Cheng 已提交
4598 4599
}

H
Haojun Liao 已提交
4600 4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
  STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
  if (p == NULL || *p == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    int32_t size = taosHashGetSize(pTableMap);
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
    return NULL;
  }

  return *p;
}

H
Haojun Liao 已提交
4612
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
4613
  SReaderStatus*       pStatus = &pReader->status;
H
Haojun Liao 已提交
4614
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Haojun Liao 已提交
4615
  STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
4616
  if (pBlockScanInfo == NULL) {
4617
    return NULL;
4618 4619
  }

4620
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4621
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4622
    tBlockDataDestroy(&pStatus->fileBlockData);
4623 4624
    terrno = code;
    return NULL;
4625
  }
4626

4627
  copyBlockDataToSDataBlock(pReader);
H
Haojun Liao 已提交
4628
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4629 4630
}

H
Haojun Liao 已提交
4631
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4632
  STsdbReader* pTReader = pReader;
4633 4634
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
4635
      pTReader = pReader->innerReader[0];
4636
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
4637
      pTReader = pReader->innerReader[1];
4638 4639 4640
    }
  }

4641 4642 4643 4644 4645 4646 4647
  SReaderStatus* pStatus = &pTReader->status;
  if (pStatus->composedDataBlock) {
    return pTReader->pResBlock;
  }

  SSDataBlock* ret = doRetrieveDataBlock(pTReader);

4648
  qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
4649
  tsdbReleaseReader(pReader);
4650 4651

  return ret;
4652 4653
}

H
Haojun Liao 已提交
4654
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
4655
  qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
4656
  tsdbAcquireReader(pReader);
L
Liu Jicong 已提交
4657 4658 4659 4660 4661

  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

H
Haojun Liao 已提交
4662
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
L
Liu Jicong 已提交
4663
    tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
4664

4665
    tsdbReleaseReader(pReader);
4666

4667 4668
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4669

H
Haojun Liao 已提交
4670 4671 4672
  SReaderStatus* pStatus = &pReader->status;

  SDataBlockIter* pBlockIter = &pStatus->blockIter;
4673

L
Liu Jicong 已提交
4674
  pReader->order = pCond->order;
4675
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
H
Haojun Liao 已提交
4676 4677
  pStatus->loadFromFile = true;
  pStatus->pTableIter = NULL;
H
Haojun Liao 已提交
4678
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4679

4680
  // allocate buffer in order to load data blocks from file
4681
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4682

4683
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4684
  tsdbDataFReaderClose(&pReader->pFileReader);
4685

H
Haojun Liao 已提交
4686
  int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
L
Liu Jicong 已提交
4687

H
Haojun Liao 已提交
4688
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4689
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4690
  resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4691

H
Hongze Cheng 已提交
4692
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
4693
  resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
4694

4695
  int32_t code = 0;
4696

4697
  // no data in files, let's try buffer in memory
H
Haojun Liao 已提交
4698 4699
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
4700
    resetTableListIndex(pStatus);
4701 4702 4703
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4704 4705
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4706

4707
      tsdbReleaseReader(pReader);
4708

4709 4710 4711
      return code;
    }
  }
H
Hongze Cheng 已提交
4712

H
Hongze Cheng 已提交
4713 4714 4715 4716
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4717

4718
  tsdbReleaseReader(pReader);
4719

4720
  return code;
H
Hongze Cheng 已提交
4721
}
H
Hongze Cheng 已提交
4722

4723 4724 4725
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4726

4727 4728 4729 4730
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
4731
  pTableBlockInfo->numOfVgroups = 1;
H
Hongze Cheng 已提交
4732

4733
  // find the start data block in file
dengyihao's avatar
dengyihao 已提交
4734 4735 4736 4737 4738

  tsdbAcquireReader(pReader);
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }
4739
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4740

4741 4742 4743
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4744

4745
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4746

4747
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4748

4749 4750
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4751

4752 4753
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4754

4755 4756
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4757
  }
H
Hongze Cheng 已提交
4758

4759
  pTableBlockInfo->numOfTables = numOfTables;
4760
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4761

4762 4763
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4764
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4765

4766 4767
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4768

4769 4770 4771
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4772

4773 4774 4775
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4776

4777 4778 4779
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4780

4781
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4782

4783 4784
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4785

H
Haojun Liao 已提交
4786
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4787 4788
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
H
Haojun Liao 已提交
4789
      if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
4790 4791
        break;
      }
H
refact  
Hongze Cheng 已提交
4792

4793 4794
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4795
    }
H
refact  
Hongze Cheng 已提交
4796

H
Hongze Cheng 已提交
4797 4798
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4799
  }
dengyihao's avatar
dengyihao 已提交
4800
  tsdbReleaseReader(pReader);
H
refact  
Hongze Cheng 已提交
4801 4802
  return code;
}
H
Hongze Cheng 已提交
4803

H
refact  
Hongze Cheng 已提交
4804
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4805
  int64_t rows = 0;
H
Hongze Cheng 已提交
4806

4807
  SReaderStatus* pStatus = &pReader->status;
4808
  tsdbAcquireReader(pReader);
4809 4810 4811 4812
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4813
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4814

4815
  while (pStatus->pTableIter != NULL) {
4816
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4817 4818

    STbData* d = NULL;
4819
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
4820
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4821 4822 4823 4824 4825 4826
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
4827
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
4828
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4829 4830 4831 4832 4833 4834 4835 4836
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4837

4838
  tsdbReleaseReader(pReader);
4839

H
refact  
Hongze Cheng 已提交
4840
  return rows;
H
Hongze Cheng 已提交
4841
}
D
dapan1121 已提交
4842

L
Liu Jicong 已提交
4843
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4844 4845 4846 4847
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4848
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4849 4850 4851 4852 4853 4854 4855
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4856

D
dapan1121 已提交
4857
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4858
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4859
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4860
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4861 4862 4863 4864 4865 4866
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4867
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4868
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4869 4870 4871 4872
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4873 4874 4875
  }

  metaReaderClear(&mr);
4876
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4877

D
dapan1121 已提交
4878 4879
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4880

H
Hongze Cheng 已提交
4881
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
H
Hongze Cheng 已提交
4882 4883 4884
  int32_t        code = 0;
  STsdb*         pTsdb = pReader->pTsdb;
  SVersionRange* pRange = &pReader->verRange;
H
Hongze Cheng 已提交
4885 4886

  // alloc
H
Hongze Cheng 已提交
4887 4888
  STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
  if (pSnap == NULL) {
H
Hongze Cheng 已提交
4889 4890 4891 4892 4893
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
H
Hongze Cheng 已提交
4894
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4895 4896

  // take snapshot
H
Hongze Cheng 已提交
4897
  if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
H
Hongze Cheng 已提交
4898 4899 4900 4901 4902 4903 4904 4905 4906 4907 4908
    pSnap->pMem = pTsdb->mem;
    pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
    if (pSnap->pNode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pNode->pQHandle = pReader;
    pSnap->pNode->reseek = reseek;

    tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
H
Hongze Cheng 已提交
4909 4910
  }

H
Hongze Cheng 已提交
4911
  if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
H
Hongze Cheng 已提交
4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922
    pSnap->pIMem = pTsdb->imem;
    pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
    if (pSnap->pINode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pINode->pQHandle = pReader;
    pSnap->pINode->reseek = reseek;

    tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
H
Hongze Cheng 已提交
4923 4924
  }

H
Hongze Cheng 已提交
4925
  // fs
H
Hongze Cheng 已提交
4926
  code = tsdbFSRef(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4927 4928 4929 4930
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4931 4932

  // unlock
H
Hongze Cheng 已提交
4933
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4934

4935
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4936

H
Hongze Cheng 已提交
4937
_exit:
H
Hongze Cheng 已提交
4938 4939 4940 4941 4942 4943 4944 4945 4946 4947
  if (code) {
    *ppSnap = NULL;
    if (pSnap) {
      if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
      if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
      taosMemoryFree(pSnap);
    }
  } else {
    *ppSnap = pSnap;
  }
H
Hongze Cheng 已提交
4948 4949 4950
  return code;
}

4951
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) {
H
Hongze Cheng 已提交
4952 4953
  STsdb* pTsdb = pReader->pTsdb;

H
Hongze Cheng 已提交
4954 4955
  if (pSnap) {
    if (pSnap->pMem) {
4956
      tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
H
Hongze Cheng 已提交
4957 4958 4959
    }

    if (pSnap->pIMem) {
4960
      tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
H
Hongze Cheng 已提交
4961 4962
    }

H
Hongze Cheng 已提交
4963
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4964 4965
    if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
    if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
H
Hongze Cheng 已提交
4966
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4967
  }
4968
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4969
}