tsdbRead.c 163.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
X
Xiaoyu Wang 已提交
82 83 84
  //  double  getTbFromMemTime;
  //  double  getTbFromIMemTime;
  double initDelSkylineIterTime;
H
Hongze Cheng 已提交
85 86 87
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
88 89 90 91 92 93 94
  SArray*        pColAgg;
  SColumnDataAgg tsColAgg;
  int16_t*       colId;
  int16_t*       slotId;
  int32_t        numOfCols;
  char**         buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
  bool           smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
95 96
} SBlockLoadSuppInfo;

97
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
98 99 100 101 102
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
103
  SSttBlockLoadInfo* pInfo;
104 105
} SLastBlockReader;

106
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
107 108 109
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
110
  int32_t           order;
H
Hongze Cheng 已提交
111
  SLastBlockReader* pLastBlockReader;  // last file block reader
112
} SFilesetIter;
H
Haojun Liao 已提交
113 114

typedef struct SFileDataBlockInfo {
115
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
116
  uint64_t uid;
117
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
118 119 120
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
121
  int32_t   numOfBlocks;
122
  int32_t   index;
H
Hongze Cheng 已提交
123
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
124
  int32_t   order;
H
Hongze Cheng 已提交
125
  SDataBlk  block;  // current SDataBlk data
126
  SHashObj* pTableMap;
H
Haojun Liao 已提交
127 128 129
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
130 131 132 133
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
134 135
} SFileBlockDumpInfo;

136
typedef struct STableUidList {
137 138
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
139
} STableUidList;
140

H
Haojun Liao 已提交
141
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
142 143 144
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
145
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
146
  STableUidList         uidList;            // check tables in uid order, to avoid the repeatly load of blocks in STT.
H
Hongze Cheng 已提交
147 148 149 150 151
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
152 153
} SReaderStatus;

154
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
155 156 157
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
158 159
} SBlockInfoBuf;

H
Hongze Cheng 已提交
160
struct STsdbReader {
H
Haojun Liao 已提交
161
  STsdb*             pTsdb;
162 163 164
  SVersionRange      verRange;
  TdThreadMutex      readerMutex;
  bool               suspended;
H
Haojun Liao 已提交
165 166
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
167
  bool               freeBlock;
H
Haojun Liao 已提交
168 169 170 171
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
172 173
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
174
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
175
  STsdbReadSnap*     pReadSnap;
176
  SIOCostSummary     cost;
177 178 179 180 181
  STSchema*          pSchema;      // the newest version schema
  STSchema*          pMemSchema;   // the previous schema for in-memory data, to avoid load schema too many times
  SDataFReader*      pFileReader;  // the file reader
  SDelFReader*       pDelFReader;  // the del file reader
  SArray*            pDelIdx;      // del file block index;
182 183 184
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
185
};
H
Hongze Cheng 已提交
186

H
Haojun Liao 已提交
187
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
188 189
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
190
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
191 192
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
193
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
194
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
195
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
196
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
197
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
H
Hongze Cheng 已提交
198
                                     STableBlockScanInfo* pInfo);
199
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
200
                                         int32_t rowIndex);
201
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
202 203
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
204

H
Hongze Cheng 已提交
205
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
206
                                        TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
H
Hongze Cheng 已提交
207
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
H
Hongze Cheng 已提交
208
                                  STsdbReader* pReader, SRow** pTSRow);
209 210
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
211

dengyihao's avatar
dengyihao 已提交
212 213 214 215
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
216
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
217 218 219
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
C
Cary Xu 已提交
220
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
221
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
222
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
223
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
C
Cary Xu 已提交
224

225 226
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);

C
Cary Xu 已提交
227
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
228

229 230
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
                                   int32_t numOfCols) {
231
  pSupInfo->smaValid = true;
232
  pSupInfo->numOfCols = numOfCols;
233
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
H
Haojun Liao 已提交
234 235
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
236 237
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
238

H
Haojun Liao 已提交
239
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
240
  pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
241
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
242 243
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
244

H
Haojun Liao 已提交
245 246
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
247 248
    } else {
      pSupInfo->buildBuf[i] = NULL;
249
    }
H
Haojun Liao 已提交
250
  }
H
Hongze Cheng 已提交
251

H
Haojun Liao 已提交
252 253
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
254

H
Haojun Liao 已提交
255
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
256 257
  int32_t i = 0, j = 0;

H
Hongze Cheng 已提交
258
  while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
259
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
260
    if (pTCol->colId == pSupInfo->colId[j]) {
261 262
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
263
        return TSDB_CODE_SUCCESS;
264 265 266 267
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
268
    } else if (pTCol->colId < pSupInfo->colId[j]) {
269 270 271
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
272
      return TSDB_CODE_INVALID_PARA;
273 274
    }
  }
H
Haojun Liao 已提交
275 276

  return TSDB_CODE_SUCCESS;
277 278
}

279
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
280
  int32_t num = numOfTables / pBuf->numPerBucket;
281 282 283 284 285
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
286
  for (int32_t i = 0; i < num; ++i) {
287 288 289 290
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
291

292 293 294 295 296 297 298
    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
299
    }
300
    taosArrayPush(pBuf->pData, &p);
H
Haojun Liao 已提交
301
  }
H
Hongze Cheng 已提交
302

H
Haojun Liao 已提交
303 304
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
305

306 307
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
308
  for (int32_t i = 0; i < num; ++i) {
309 310 311 312 313 314 315 316 317
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
318
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
319 320 321
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

H
Haojun Liao 已提交
322 323 324 325 326 327 328 329 330 331
static int32_t uidComparFunc(const void* p1, const void* p2) {
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
  if (pu1 == pu2) {
    return 0;
  } else {
    return (pu1 < pu2) ? -1 : 1;
  }
}

332
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Hongze Cheng 已提交
333
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
X
Xiaoyu Wang 已提交
334
                                         STableUidList* pUidList, int32_t numOfTables) {
H
Haojun Liao 已提交
335
  // allocate buffer in order to load data blocks from file
336
  // todo use simple hash instead, optimize the memory consumption
337 338 339
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
340 341 342
    return NULL;
  }

H
Haojun Liao 已提交
343
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
344
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
345

H
Haojun Liao 已提交
346 347
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
  if (pUidList->tableUidList == NULL) {
H
Haojun Liao 已提交
348 349
    return NULL;
  }
H
Haojun Liao 已提交
350
  pUidList->currentIndex = 0;
H
Haojun Liao 已提交
351

352
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
353
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
H
Haojun Liao 已提交
354

355
    pScanInfo->uid = idList[j].uid;
H
Haojun Liao 已提交
356
    pUidList->tableUidList[j] = idList[j].uid;
H
Haojun Liao 已提交
357

358
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
359
      int64_t skey = pTsdbReader->window.skey;
360
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
361
    } else {
H
Haojun Liao 已提交
362
      int64_t ekey = pTsdbReader->window.ekey;
363
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
364
    }
wmmhello's avatar
wmmhello 已提交
365

366
    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
367 368
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
369 370
  }

H
Haojun Liao 已提交
371
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
H
Haojun Liao 已提交
372

H
Haojun Liao 已提交
373 374 375 376
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
377

378
  return pTableMap;
H
Hongze Cheng 已提交
379
}
H
Hongze Cheng 已提交
380

381 382
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
383
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
384
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
385 386

    pInfo->iterInit = false;
H
Haojun Liao 已提交
387
    pInfo->iter.hasVal = false;
388
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
389

390 391
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
392 393
    }

H
Haojun Liao 已提交
394 395
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
396 397
    }

398 399
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
400 401 402
  }
}

403 404
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
405 406

  p->iter.hasVal = false;
407
  p->iiter.hasVal = false;
408

409 410 411
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
412

413 414 415
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
416

417 418 419 420
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
421

H
Haojun Liao 已提交
422
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
423
  void* p = NULL;
424
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
425
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
426 427 428 429 430
  }

  taosHashCleanup(pTableMap);
}

431
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
H
Hongze Cheng 已提交
432

433 434 435
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
436
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
437

438
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
439
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
440

dengyihao's avatar
dengyihao 已提交
441
  STimeWindow win = *pWindow;
442 443 444 445 446 447
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
448

H
Haojun Liao 已提交
449
// init file iterator
450
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
451
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
452

453 454
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
455
  pIter->pFileList = aDFileSet;
456
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
457

458 459 460 461
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
462
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
463 464
      return code;
    }
465 466
  }

467 468 469 470 471 472 473 474
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

475
  if (pLReader->pInfo == NULL) {
476
    // here we ignore the first column, which is always be the primary timestamp column
477 478 479
    SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;

    int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
X
Xiaoyu Wang 已提交
480
    pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
H
Haojun Liao 已提交
481 482 483 484
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
485 486
  }

487
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
488 489 490
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
491
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool *hasNext) {
492 493
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
494
  pIter->index += step;
D
dapan1121 已提交
495
  int32_t code = 0;
496 497

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
498 499
    *hasNext = false;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
500 501
  }

H
Haojun Liao 已提交
502 503 504
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

505 506
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
507
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
508

H
Haojun Liao 已提交
509 510
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
511

512
  while (1) {
H
Haojun Liao 已提交
513 514 515
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
516

517
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
518

D
dapan1121 已提交
519
    code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
520 521 522
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
523

524 525
    pReader->cost.headFileLoad += 1;

526 527 528 529 530 531 532
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
533 534
      *hasNext = false;
      return TSDB_CODE_SUCCESS;
535 536 537 538
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
539
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
540 541
        *hasNext = false;
        return TSDB_CODE_SUCCESS;
542
      }
543 544
      continue;
    }
C
Cary Xu 已提交
545

546
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
547
              pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
548 549
    *hasNext = true;
    return TSDB_CODE_SUCCESS;
550
  }
551

552
_err:
D
dapan1121 已提交
553 554
  *hasNext = false;
  return code;
H
Haojun Liao 已提交
555 556
}

557
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
558 559
  pIter->order = order;
  pIter->index = -1;
560
  pIter->numOfBlocks = 0;
561 562 563 564 565 566 567
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
568
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
569

H
Haojun Liao 已提交
570
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
571 572
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
573 574
}

575 576 577 578 579 580 581 582
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
583
    SColumnInfoData colInfo = {0};
584 585 586 587 588 589 590 591 592 593 594 595 596
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexInit(&pReader->readerMutex, NULL);

  qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexDestroy(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexTryLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbReleaseReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexUnlock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

652 653 654 655 656 657
void tsdbReleaseDataBlock(STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;
  if (!pStatus->composedDataBlock) {
    tsdbReleaseReader(pReader);
  }
}
658

659
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
660
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
661
  int32_t      code = 0;
662
  int8_t       level = 0;
H
Haojun Liao 已提交
663
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
664 665
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
666
    goto _end;
H
Hongze Cheng 已提交
667 668
  }

C
Cary Xu 已提交
669
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
670
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
671 672
  }

H
Haojun Liao 已提交
673
  initReaderStatus(&pReader->status);
674

L
Liu Jicong 已提交
675
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
676 677
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
678
  pReader->capacity = capacity;
H
Haojun Liao 已提交
679
  pReader->pResBlock = pResBlock;
680
  pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
dengyihao's avatar
dengyihao 已提交
681
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
682
  pReader->type = pCond->type;
683
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
684
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
685

H
Haojun Liao 已提交
686 687 688 689 690 691 692 693
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }
694

H
Haojun Liao 已提交
695 696 697 698 699
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }
H
Hongze Cheng 已提交
700

701 702
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
703
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
704
  if (pSup->pColAgg == NULL) {
705 706 707
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
708

709 710
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
711
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
712 713 714 715 716
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

717
  setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
718

719
  tsdbInitReaderLock(pReader);
720

H
Hongze Cheng 已提交
721 722
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
723

H
Haojun Liao 已提交
724 725
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
726 727 728
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
729

H
Haojun Liao 已提交
730
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
X
Xiaoyu Wang 已提交
731
  int64_t    st = taosGetTimestampUs();
732 733 734
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
735
    goto _end;
H
Haojun Liao 已提交
736
  }
H
Hongze Cheng 已提交
737

H
Haojun Liao 已提交
738 739
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

740 741
  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
742
  if (num == 0) {
743
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
744 745
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
746

H
Haojun Liao 已提交
747
  // todo binary search to the start position
748 749
  int64_t et1 = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
750
  SBlockIdx*     pBlockIdx = NULL;
751
  STableUidList* pList = &pReader->status.uidList;
H
Haojun Liao 已提交
752

H
Haojun Liao 已提交
753
  int32_t i = 0, j = 0;
X
Xiaoyu Wang 已提交
754
  while (i < num && j < numOfTables) {
H
Haojun Liao 已提交
755
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Hongze Cheng 已提交
756
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
757
      i += 1;
H
Haojun Liao 已提交
758 759 760
      continue;
    }

H
Haojun Liao 已提交
761 762
    if (pBlockIdx->uid < pList->tableUidList[j]) {
      i += 1;
H
Haojun Liao 已提交
763 764 765
      continue;
    }

H
Haojun Liao 已提交
766
    if (pBlockIdx->uid > pList->tableUidList[j]) {
H
Haojun Liao 已提交
767
      j += 1;
H
Haojun Liao 已提交
768
      continue;
H
Haojun Liao 已提交
769 770
    }

H
Haojun Liao 已提交
771
    if (pBlockIdx->uid == pList->tableUidList[j]) {
H
Haojun Liao 已提交
772 773 774
      // this block belongs to a table that is not queried.
      void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
      if (p == NULL) {
X
Xiaoyu Wang 已提交
775 776
        tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%" PRIu64 ", %s", pBlockIdx->uid,
                  pReader->idStr);
H
Haojun Liao 已提交
777 778 779 780 781 782 783 784 785
        return TSDB_CODE_APP_ERROR;
      }

      STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
      if (pScanInfo->pBlockList == NULL) {
        pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
      }

      taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
786

H
Haojun Liao 已提交
787
      i += 1;
H
Haojun Liao 已提交
788
      j += 1;
789
    }
H
Haojun Liao 已提交
790
  }
H
Hongze Cheng 已提交
791

792
  int64_t et2 = taosGetTimestampUs();
H
Haojun Liao 已提交
793 794 795
  tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0,
            pReader->idStr);
796 797 798

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

799
_end:
800
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
801 802
  return code;
}
H
Hongze Cheng 已提交
803

804
static void cleanupTableScanInfo(SHashObj* pTableMap) {
805
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
806
  while (1) {
807
    px = taosHashIterate(pTableMap, px);
808 809 810 811
    if (px == NULL) {
      break;
    }

812
    // reset the index in last block when handing a new file
813 814
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
815
  }
816 817
}

818
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
819 820 821 822 823 824
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
825

dengyihao's avatar
dengyihao 已提交
826
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
827
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
828

H
Haojun Liao 已提交
829 830
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
831

832
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
833
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
834
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
835

836
    sizeInDisk += pScanInfo->mapData.nData;
837 838 839 840 841 842 843 844 845 846 847 848 849

    int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
    STimeWindow w = pReader->window;
    if (ASCENDING_TRAVERSE(pReader->order)) {
      w.skey = pScanInfo->lastKey + step;
    } else {
      w.ekey = pScanInfo->lastKey + step;
    }

    if (isEmptyQueryTimeWindow(&w)) {
      continue;
    }

H
Haojun Liao 已提交
850
    SDataBlk block = {0};
851
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
852
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
853

854
      // 1. time range check
855 856
      // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
      if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
H
Haojun Liao 已提交
857 858
        continue;
      }
H
Hongze Cheng 已提交
859

860
      // 2. version range check
H
Hongze Cheng 已提交
861
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
862 863
        continue;
      }
864

865
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
866
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
867

H
Haojun Liao 已提交
868 869
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
870
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
871 872
        return TSDB_CODE_OUT_OF_MEMORY;
      }
873

874
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
875
    }
H
Hongze Cheng 已提交
876

H
Haojun Liao 已提交
877
    if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
878 879 880 881
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
882
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
883
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
884

885
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
886
  tsdbDebug(
887
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
888
      "time:%.2f ms %s",
889
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
890
      pReader->idStr);
891

892
  pReader->cost.numOfBlocks += total;
893
  pReader->cost.headFileLoadTime += el;
894

H
Haojun Liao 已提交
895 896
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
897

898
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
899
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
900
  pDumpInfo->allDumped = true;
901
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
902 903
}

904 905
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
906
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
907
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
908 909 910
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
911
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
912 913 914 915
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
916 917 918
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
919
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
920
  }
H
Haojun Liao 已提交
921 922
}

923
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
924 925 926
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
927 928
    return NULL;
  }
929 930 931

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
932 933
}

H
Hongze Cheng 已提交
934
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
935

C
Cary Xu 已提交
936 937 938 939 940 941
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
942
  assert(pos >= 0 && pos < num);
C
Cary Xu 已提交
943 944 945 946
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
947 948
    e = num - 1;
    if (key < keyList[pos]) return -1;
C
Cary Xu 已提交
949 950
    while (1) {
      // check can return
H
Hongze Cheng 已提交
951 952 953
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
C
Cary Xu 已提交
954 955

      // change start or end position
H
Hongze Cheng 已提交
956
      int mid = s + (e - s + 1) / 2;
C
Cary Xu 已提交
957 958
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
959
      else if (keyList[mid] < key)
C
Cary Xu 已提交
960 961 962 963
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
964
  } else {  // DESC
C
Cary Xu 已提交
965
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
966 967
    e = 0;
    if (key > keyList[pos]) return -1;
C
Cary Xu 已提交
968 969
    while (1) {
      // check can return
H
Hongze Cheng 已提交
970 971 972
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
C
Cary Xu 已提交
973 974

      // change start or end position
H
Hongze Cheng 已提交
975
      int mid = s - (s - e + 1) / 2;
C
Cary Xu 已提交
976 977
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
978
      else if (keyList[mid] > key)
C
Cary Xu 已提交
979 980 981 982 983 984 985
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
986
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
C
Cary Xu 已提交
987 988
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
989
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
990 991 992 993 994 995

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
996 997
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
C
Cary Xu 已提交
998 999 1000 1001 1002
  }

  return endPos;
}

H
Haojun Liao 已提交
1003
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
1023 1024
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Hongze Cheng 已提交
1025
                            int32_t dumpedRows, bool asc) {
H
Haojun Liao 已提交
1026 1027 1028 1029 1030 1031 1032 1033
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

H
Hongze Cheng 已提交
1034
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1035

H
Haojun Liao 已提交
1036
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
1037
  //  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
1038 1039 1040 1041 1042 1043

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
H
Hongze Cheng 已提交
1044
    switch (pColData->info.type) {
H
Haojun Liao 已提交
1045 1046 1047
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
1048
      case TSDB_DATA_TYPE_UBIGINT: {
H
Haojun Liao 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
H
Hongze Cheng 已提交
1062
        int32_t mid = dumpedRows >> 1u;
H
Haojun Liao 已提交
1063 1064
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1065
          int8_t t = pts[j];
H
Haojun Liao 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1090
          int32_t t = pts[j];
H
Haojun Liao 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1113
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
1114 1115 1116 1117
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1118

1119
  SBlockData*         pBlockData = &pStatus->fileBlockData;
C
Cary Xu 已提交
1120
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1121
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1122
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1123
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1124

H
Haojun Liao 已提交
1125
  SColVal cv = {0};
1126
  int64_t st = taosGetTimestampUs();
1127 1128
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1129

1130 1131
  // no data exists, return directly.
  if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
X
Xiaoyu Wang 已提交
1132 1133
    tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
             pBlockInfo->uid, pReader->idStr);
1134 1135 1136 1137
    pResBlock->info.rows = 0;
    return 0;
  }

1138 1139
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1140 1141 1142
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1143
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1144
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1145 1146 1147
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1148 1149 1150 1151 1152 1153 1154 1155 1156

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1157
    }
C
Cary Xu 已提交
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
  }

  // time window check
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1168 1169 1170
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1171 1172
  }

H
Haojun Liao 已提交
1173
  int32_t i = 0;
C
Cary Xu 已提交
1174 1175
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1176 1177
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1178
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1179 1180 1181
    i += 1;
  }

1182
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1183
  int32_t num = pBlockData->nColData;
1184
  while (i < numOfOutputCols && colIndex < num) {
1185 1186
    rowIndex = 0;

H
Hongze Cheng 已提交
1187
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1188
    if (pData->cid < pSupInfo->colId[i]) {
1189
      colIndex += 1;
H
Haojun Liao 已提交
1190 1191
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1192

H
Hongze Cheng 已提交
1193
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1194
        colDataAppendNNULL(pColData, 0, dumpedRows);
C
Cary Xu 已提交
1195
      } else {
H
Haojun Liao 已提交
1196
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1197 1198
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1199
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
C
Cary Xu 已提交
1200 1201 1202 1203
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1204
      }
C
Cary Xu 已提交
1205

1206
      colIndex += 1;
1207
      i += 1;
1208
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1209
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1210
      colDataAppendNNULL(pColData, 0, dumpedRows);
1211
      i += 1;
H
Haojun Liao 已提交
1212
    }
1213 1214
  }

1215
  // fill the mis-matched columns with null value
1216
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1217
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1218
    colDataAppendNNULL(pColData, 0, dumpedRows);
1219
    i += 1;
H
Haojun Liao 已提交
1220
  }
H
Haojun Liao 已提交
1221

1222
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1223 1224
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1225

1226
  // check if current block are all handled
C
Cary Xu 已提交
1227 1228
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1229 1230 1231
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
C
Cary Xu 已提交
1232
  } else {
1233 1234
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
C
Cary Xu 已提交
1235
  }
H
Haojun Liao 已提交
1236

1237
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1238
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1239

1240
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1241
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1242
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1243
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1244
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1245 1246 1247 1248

  return TSDB_CODE_SUCCESS;
}

1249 1250
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1251
  int32_t code = 0;
1252 1253
  int64_t st = taosGetTimestampUs();

1254
  tBlockDataReset(pBlockData);
1255 1256
  STSchema* pSchema = getLatestTableSchema(pReader, uid);
  if (pSchema == NULL) {
X
Xiaoyu Wang 已提交
1257
    tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
1258 1259 1260 1261
    return code;
  }

  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
X
Xiaoyu Wang 已提交
1262
  TABLEID             tid = {.suid = pReader->suid, .uid = uid};
1263
  code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
1264 1265 1266 1267
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1268
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1269
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1270

H
Hongze Cheng 已提交
1271
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1272
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1273 1274 1275
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
1276
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1277 1278 1279
              tstrerror(code), pReader->idStr);
    return code;
  }
1280

1281
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1282

1283 1284 1285 1286
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1287 1288 1289

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1290

H
Haojun Liao 已提交
1291
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1292
}
H
Hongze Cheng 已提交
1293

H
Haojun Liao 已提交
1294 1295 1296
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1297

H
Haojun Liao 已提交
1298 1299 1300 1301
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1302

H
Haojun Liao 已提交
1303 1304
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1305

H
Haojun Liao 已提交
1306 1307
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1308 1309
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1310

H
Haojun Liao 已提交
1311 1312 1313 1314
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1315

H
Haojun Liao 已提交
1316 1317
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1318

H
Haojun Liao 已提交
1319
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1320
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1321
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1322

H
Haojun Liao 已提交
1323
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1324

H
Haojun Liao 已提交
1325 1326
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1327

H
Haojun Liao 已提交
1328 1329 1330 1331 1332 1333 1334
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1335

1336
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1337
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1338

1339 1340 1341
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1342
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1343 1344
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1345
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1346
    if (pScanInfo == NULL) {
1347
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1348 1349 1350
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1351 1352
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1353
  }
1354 1355 1356 1357 1358 1359

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1360
}
H
Hongze Cheng 已提交
1361

1362
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1363
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1364

1365
  SBlockOrderSupporter sup = {0};
1366
  pBlockIter->numOfBlocks = numOfBlocks;
1367
  taosArrayClear(pBlockIter->blockList);
1368
  pBlockIter->pTableMap = pReader->status.pTableMap;
1369

1370 1371
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1372

1373
  int64_t st = taosGetTimestampUs();
1374
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1375 1376 1377
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1378

1379 1380 1381 1382 1383 1384 1385
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1386

1387
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1388 1389 1390
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1391

1392 1393
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1394

1395 1396 1397
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1398
      return TSDB_CODE_OUT_OF_MEMORY;
1399
    }
H
Haojun Liao 已提交
1400

1401
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1402

1403 1404 1405
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1406
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1407 1408 1409 1410 1411
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1412

H
Haojun Liao 已提交
1413 1414 1415 1416
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1417

1418
  // since there is only one table qualified, blocks are not sorted
1419 1420
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1421 1422
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1423
    }
1424

1425
    int64_t et = taosGetTimestampUs();
1426
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1427
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1428

1429
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1430
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1431
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1432
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1433
  }
H
Haojun Liao 已提交
1434

1435 1436
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1437

1438
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1439 1440

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1441 1442
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1443
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1444
  }
H
Haojun Liao 已提交
1445

1446 1447 1448 1449
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1450

1451 1452
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1453

1454 1455 1456 1457
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1458

1459 1460
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1461
  }
H
Haojun Liao 已提交
1462

1463
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1464 1465
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1466 1467
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1468

1469
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1470
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1471

1472
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1473
}
H
Hongze Cheng 已提交
1474

H
Haojun Liao 已提交
1475
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1476 1477
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1478
  int32_t step = asc ? 1 : -1;
1479
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1480 1481 1482
    return false;
  }

1483
  pBlockIter->index += step;
H
Haojun Liao 已提交
1484
  doSetCurrentBlock(pBlockIter, idStr);
1485

1486 1487 1488
  return true;
}

1489 1490 1491
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1492
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1493 1494
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1495 1496
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1497
}
H
Hongze Cheng 已提交
1498

1499
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1500
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1501
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1502
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1503
    return false;
1504 1505
  }

H
Haojun Liao 已提交
1506
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1507
    return false;
1508 1509
  }

1510
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1511
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1512 1513
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1514
  return true;
1515 1516 1517
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1518
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1519 1520
  int32_t index = pBlockIter->index;

1521
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1533
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1534
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1535 1536 1537 1538
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1539 1540 1541 1542 1543
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1544

1545 1546 1547
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1548

H
Haojun Liao 已提交
1549
  doSetCurrentBlock(pBlockIter, "");
1550 1551 1552
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1553
// todo: this attribute could be acquired during extractin the global ordered block list.
1554
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1555 1556
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1557
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1558
  } else {
1559
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1560
  }
H
Haojun Liao 已提交
1561
}
H
Hongze Cheng 已提交
1562

H
Hongze Cheng 已提交
1563
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1564
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1565

1566
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1567
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1568
}
H
Hongze Cheng 已提交
1569

H
Hongze Cheng 已提交
1570
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1571 1572
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1573 1574
}

H
Hongze Cheng 已提交
1575 1576
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1577 1578
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1579
  for (int32_t i = startIndex; i < num; i += 1) {
1580 1581
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1582
      if (p->version >= pBlock->minVer) {
1583 1584 1585
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1586
      if (p->version >= pBlock->minVer) {
1587 1588
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1589 1590
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1604
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1605 1606 1607 1608
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1609
  // ts is not overlap
1610
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1611
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1612 1613 1614 1615 1616
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1617
  if (ASCENDING_TRAVERSE(order)) {
1618
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1619 1620
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1621
    while (1) {
1622 1623 1624 1625 1626
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1627 1628 1629
      }
    }

1630
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1631
  }
1632 1633
}

C
Cary Xu 已提交
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1647 1648
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1649

1650
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1651

1652
  // overlap with neighbor
1653
  if (hasNeighbor) {
1654
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1655 1656
  }

1657
  // has duplicated ts of different version in this block
C
Cary Xu 已提交
1658 1659
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1660

1661 1662 1663
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1664 1665
  }

C
Cary Xu 已提交
1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}

// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
1681

C
Cary Xu 已提交
1682 1683 1684
  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1685 1686 1687 1688

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
X
Xiaoyu Wang 已提交
1689
              " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
1690
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
C
Cary Xu 已提交
1691 1692 1693
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1694 1695 1696
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1697 1698
}

C
Cary Xu 已提交
1699 1700 1701 1702 1703 1704 1705 1706 1707
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1708
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1709
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1710 1711
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1712

1713 1714 1715
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1716
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1717

H
Haojun Liao 已提交
1718
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1719
  pBlock->info.id.uid = pBlockScanInfo->uid;
1720

1721
  setComposedBlockFlag(pReader, true);
1722

1723
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1724
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
X
Xiaoyu Wang 已提交
1725
            " - %" PRId64 ", uid:%" PRIu64 ",  %s",
1726
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
1727
            pBlockScanInfo->uid, pReader->idStr);
1728 1729

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1730 1731 1732
  return code;
}

1733 1734
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1735 1736 1737
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1738 1739
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1740
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1741 1742

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1743
    if (nextKey != key) {  // merge is not needed
1744
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1745 1746 1747 1748 1749 1750 1751 1752
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1753
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
1754
                                  SVersionRange* pVerRange) {
X
Xiaoyu Wang 已提交
1755
  int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
H
Haojun Liao 已提交
1756

1757 1758 1759 1760 1761 1762 1763 1764
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1765 1766 1767
    if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
      pScanInfo->lastKey = k.ts;
    } else {
H
Haojun Liao 已提交
1768 1769 1770 1771 1772 1773
      // the qualifed ts may equal to k.ts, only a greater version one.
      // here we need to fallback one step.
      if (pScanInfo->lastKey == k.ts) {
        pScanInfo->lastKey -= step;
      }

1774 1775 1776 1777 1778 1779 1780
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1781
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

1796 1797 1798 1799 1800 1801
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
  if (pReader->pSchema != NULL) {
    return pReader->pSchema;
  }

  pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
X
Xiaoyu Wang 已提交
1802
  if (pReader->pSchema == NULL) {
1803 1804 1805
    tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
  }

X
Xiaoyu Wang 已提交
1806
  return pReader->pSchema;
1807 1808
}

H
Haojun Liao 已提交
1809 1810 1811
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1812
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1813 1814
  }

1815
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1816 1817 1818 1819 1820 1821
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1822 1823 1824 1825 1826 1827
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1828 1829 1830 1831 1832 1833
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1834
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1835
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1836
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1837 1838
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1839 1840
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1841
  }
H
Haojun Liao 已提交
1842 1843
}

1844
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1845 1846
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
1847
  SRow*               pTSRow = NULL;
1848 1849 1850
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1851
  int64_t tsLast = INT64_MIN;
1852
  if (hasDataInLastBlock(pLastBlockReader)) {
1853 1854
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1855

H
Hongze Cheng 已提交
1856 1857
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1858

1859 1860
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1861
    minKey = INT64_MAX;  // chosen the minimum value
1862
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1863 1864
      minKey = tsLast;
    }
1865

1866 1867 1868
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1869

1870
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1871 1872 1873 1874
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1875
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1876 1877 1878 1879 1880 1881 1882
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1883
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1884 1885
      minKey = key;
    }
1886 1887 1888 1889
  }

  bool init = false;

1890
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1891
  // DESC: mem -----> imem -----> last block -----> file block
1892 1893
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1894
      init = true;
H
Hongze Cheng 已提交
1895
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1896 1897 1898
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1899
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1900 1901
    }

1902
    if (minKey == tsLast) {
1903
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1904
      if (init) {
H
Hongze Cheng 已提交
1905
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1906
      } else {
1907
        init = true;
H
Hongze Cheng 已提交
1908
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1909 1910 1911
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1912
      }
1913
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1914
    }
1915

1916
    if (minKey == k.ts) {
K
kailixu 已提交
1917 1918 1919 1920
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1921
      if (init) {
X
Xiaoyu Wang 已提交
1922
        tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1923
      } else {
1924
        init = true;
X
Xiaoyu Wang 已提交
1925
        int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1926 1927 1928 1929 1930 1931 1932
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1933 1934 1935 1936 1937
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1938
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
1939
      int32_t   code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1940 1941 1942 1943 1944
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1945
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1946 1947
        return code;
      }
1948 1949
    }

1950
    if (minKey == tsLast) {
1951
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1952
      if (init) {
H
Hongze Cheng 已提交
1953
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1954
      } else {
1955
        init = true;
H
Hongze Cheng 已提交
1956
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1957 1958 1959
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1960
      }
1961
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1962 1963 1964
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1965
      if (init) {
H
Hongze Cheng 已提交
1966
        tsdbRowMerge(&merge, &fRow);
H
Haojun Liao 已提交
1967
      } else {
1968
        init = true;
H
Hongze Cheng 已提交
1969
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1970 1971 1972
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1973 1974 1975
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1976 1977
  }

H
Hongze Cheng 已提交
1978
  int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
1979 1980 1981 1982
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1983
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1984 1985

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
1986
  tsdbRowMergerClear(&merge);
1987 1988 1989
  return TSDB_CODE_SUCCESS;
}

1990 1991 1992
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1993
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1994
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1995

H
Hongze Cheng 已提交
1996
  SRow*      pTSRow = NULL;
1997
  SRowMerger merge = {0};
1998
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1999
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
2000

2001 2002 2003
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
2004
      pBlockScanInfo->lastKey = tsLastBlock;
2005 2006
      return TSDB_CODE_SUCCESS;
    } else {
H
Hongze Cheng 已提交
2007
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2008 2009 2010
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2011

2012
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2013
      tsdbRowMerge(&merge, &fRow1);
2014
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2015

H
Hongze Cheng 已提交
2016
      code = tsdbRowMergerGetRow(&merge, &pTSRow);
2017 2018 2019
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2020

2021
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2022 2023

      taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2024
      tsdbRowMergerClear(&merge);
2025 2026
    }
  } else {  // not merge block data
H
Hongze Cheng 已提交
2027
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2028 2029 2030 2031
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2032
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2033 2034

    // merge with block data if ts == key
H
Haojun Liao 已提交
2035
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
2036 2037 2038
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Hongze Cheng 已提交
2039
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2040 2041 2042 2043
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2044
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2045 2046

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2047
    tsdbRowMergerClear(&merge);
2048
  }
2049 2050 2051 2052

  return TSDB_CODE_SUCCESS;
}

2053 2054
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
2055 2056
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2057
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
2058
    // no last block available, only data block exists
2059
    if (!hasDataInLastBlock(pLastBlockReader)) {
2060 2061 2062 2063 2064 2065 2066 2067 2068
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
2069
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
2070 2071
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
H
Hongze Cheng 已提交
2072
        SRow*      pTSRow = NULL;
2073
        SRowMerger merge = {0};
2074

H
Hongze Cheng 已提交
2075
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2076 2077 2078 2079
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2080
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2081 2082

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2083
        tsdbRowMerge(&merge, &fRow1);
2084

2085
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
2086

H
Hongze Cheng 已提交
2087
        code = tsdbRowMergerGetRow(&merge, &pTSRow);
2088 2089 2090 2091
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2092
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2093

2094
        taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2095
        tsdbRowMergerClear(&merge);
2096
        return code;
2097
      } else {
2098
        return TSDB_CODE_SUCCESS;
2099
      }
2100
    } else {  // desc order
2101
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
2102
    }
2103
  } else {  // only last block exists
2104
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2105
  }
2106 2107
}

2108 2109
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2110
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
2111
  SRow*               pTSRow = NULL;
H
Haojun Liao 已提交
2112
  int32_t             code = TSDB_CODE_SUCCESS;
2113 2114 2115
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2116 2117
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2118

2119
  int64_t tsLast = INT64_MIN;
2120 2121 2122
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2123

H
Hongze Cheng 已提交
2124
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2125 2126 2127 2128

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2129
  int64_t minKey = 0;
2130 2131 2132 2133 2134
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2135

2136 2137 2138
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2139

2140
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2141 2142
      minKey = key;
    }
2143

2144
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2145 2146 2147
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2148
    minKey = INT64_MIN;  // let find the maximum ts value
2149 2150 2151 2152 2153 2154 2155 2156
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2157
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2158 2159 2160
      minKey = key;
    }

2161
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2162 2163
      minKey = tsLast;
    }
2164 2165 2166 2167
  }

  bool init = false;

2168 2169 2170 2171
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2172
      init = true;
2173
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Hongze Cheng 已提交
2174
      code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2175 2176 2177 2178
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2179
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2180 2181
    }

2182
    if (minKey == tsLast) {
2183
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2184
      if (init) {
H
Hongze Cheng 已提交
2185
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2186
      } else {
2187
        init = true;
H
Hongze Cheng 已提交
2188
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2189 2190 2191
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2192
      }
H
Haojun Liao 已提交
2193

2194
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2195 2196 2197
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2198
      if (init) {
H
Hongze Cheng 已提交
2199
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2200
      } else {
2201 2202
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2203 2204 2205 2206
        if (pSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2207
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2208 2209 2210
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2211
      }
H
Haojun Liao 已提交
2212

2213 2214
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2215 2216
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2217
      }
2218 2219
    }

2220
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2221
      if (init) {
2222 2223 2224 2225
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2226
        tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
2227
      } else {
2228
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2229
        code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2230 2231 2232
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2233
      }
H
Haojun Liao 已提交
2234 2235 2236 2237
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2238 2239 2240 2241 2242
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2243
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2244
      code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2245 2246 2247 2248
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2249 2250 2251 2252 2253
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2254 2255 2256
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2257
      if (init) {
H
Hongze Cheng 已提交
2258
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2259
      } else {
2260 2261
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2262
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2263 2264 2265
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2266
      }
H
Haojun Liao 已提交
2267 2268 2269 2270
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2271 2272 2273 2274
      }
    }

    if (minKey == tsLast) {
2275
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2276
      if (init) {
H
Hongze Cheng 已提交
2277
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2278
      } else {
2279
        init = true;
H
Hongze Cheng 已提交
2280
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2281 2282 2283
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2284
      }
2285
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2286 2287 2288
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2289
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2290
      if (!init) {
H
Hongze Cheng 已提交
2291
        code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2292 2293 2294
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2295
      } else {
2296 2297 2298
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Hongze Cheng 已提交
2299
        tsdbRowMerge(&merge, &fRow);
2300 2301
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2302 2303 2304
    }
  }

2305
  if (merge.pTSchema == NULL) {
2306 2307 2308
    return code;
  }

H
Hongze Cheng 已提交
2309
  code = tsdbRowMergerGetRow(&merge, &pTSRow);
2310 2311 2312 2313
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2314
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2315 2316

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2317
  tsdbRowMergerClear(&merge);
2318
  return code;
2319 2320
}

2321 2322 2323 2324 2325 2326 2327 2328 2329
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
2330 2331
    // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
2332
  } else {
2333 2334
    // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
2335 2336 2337
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
D
dapan1121 已提交
2338
  int64_t st = 0;
2339 2340 2341 2342 2343 2344 2345 2346 2347

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2348
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2349
                  "-%" PRId64 " %s",
2350 2351
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2352
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2353 2354 2355 2356 2357
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2358
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2369
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2370
                  "-%" PRId64 " %s",
2371 2372
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2373
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2374 2375 2376 2377 2378
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2379
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2380 2381
  }

2382
  st = taosGetTimestampUs();
2383
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
2384
  pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
2385 2386 2387 2388 2389

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2390 2391
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2392 2393 2394 2395 2396 2397 2398 2399
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2411
  TSDBKEY k = {.ts = ts, .version = ver};
2412 2413
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2414 2415 2416
    return false;
  }

2417 2418 2419
  return true;
}

2420
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2421
  // the last block reader has been initialized for this table.
2422
  if (pLBlockReader->uid == pScanInfo->uid) {
2423
    return hasDataInLastBlock(pLBlockReader);
2424 2425
  }

2426 2427
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2428 2429
  }

2430 2431
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2432

H
Hongze Cheng 已提交
2433
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2434 2435 2436
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2437
  } else {
2438
    w.ekey = pScanInfo->lastKey + step;
2439 2440
  }

X
Xiaoyu Wang 已提交
2441 2442
  tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
            pScanInfo->uid, pReader->idStr);
2443 2444 2445
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2446 2447 2448 2449
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2450
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2451 2452
}

2453
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2454
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2455
  return TSDBROW_TS(&row);
2456 2457
}

H
Hongze Cheng 已提交
2458
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2459

2460
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2461
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
2462
    return false;  // this is an invalid result.
2463
  }
2464
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2465
}
2466

2467 2468
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2469 2470
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2471
    pBlockScanInfo->lastKey = key;
2472 2473
    return TSDB_CODE_SUCCESS;
  } else {
C
Cary Xu 已提交
2474 2475
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

H
Hongze Cheng 已提交
2476
    SRow*      pTSRow = NULL;
2477 2478
    SRowMerger merge = {0};

H
Hongze Cheng 已提交
2479
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2480 2481 2482 2483
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2484
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Hongze Cheng 已提交
2485
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2486 2487 2488 2489
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2490
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2491 2492

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2493
    tsdbRowMergerClear(&merge);
2494 2495 2496 2497
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2498 2499
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2500 2501
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2502
  TSDBROW *pRow = NULL, *piRow = NULL;
2503
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2504 2505 2506
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2507

2508 2509 2510
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2511

2512 2513 2514 2515
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2516

2517 2518 2519 2520
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2521

2522 2523 2524
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2525
  }
2526 2527 2528

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2529 2530
}

H
Haojun Liao 已提交
2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2584
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2585 2586
  int32_t code = TSDB_CODE_SUCCESS;

2587 2588
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2589
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
C
Cary Xu 已提交
2590 2591
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2592
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
2593
  int64_t st = taosGetTimestampUs();
2594
  int32_t step = asc ? 1 : -1;
2595
  double  el = 0;
2596 2597 2598

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2599 2600
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2601
      code = TSDB_CODE_INVALID_PARA;
2602 2603
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2604 2605 2606
      goto _end;
    }

H
Hongze Cheng 已提交
2607
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2608

C
Cary Xu 已提交
2609
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2610
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
C
Cary Xu 已提交
2611 2612

    // it is a clean block, load it directly
H
Hongze Cheng 已提交
2613
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
2614
        pBlock->nRow <= pReader->capacity) {
2615
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
2616
        copyBlockDataToSDataBlock(pReader);
2617 2618

        // record the last key value
H
Hongze Cheng 已提交
2619
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2620 2621
        goto _end;
      }
C
Cary Xu 已提交
2622 2623
    }
  } else {  // file blocks not exist
2624
    pBlockScanInfo = *pReader->status.pTableIter;
2625 2626
  }

2627
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2628
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2629

2630
  while (1) {
2631
    bool hasBlockData = false;
2632
    {
2633 2634
      while (pBlockData->nRow > 0 &&
             pBlockData->uid == pBlockScanInfo->uid) {  // find the first qualified row in data block
2635 2636 2637 2638 2639
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2640 2641
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2642
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2643
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2644
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2645

H
Haojun Liao 已提交
2646 2647 2648 2649 2650
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2651 2652
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2653
          }
2654 2655
        }
      }
2656
    }
2657

2658
    // no data in last block and block, no need to proceed.
2659
    if (hasBlockData == false) {
2660
      break;
2661 2662
    }

2663
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2664

2665
    // currently loaded file data block is consumed
2666
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2667
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2668
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2669 2670 2671 2672 2673
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2674 2675 2676
    }
  }

H
Hongze Cheng 已提交
2677
_end:
2678 2679
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2680

2681 2682 2683
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2684
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2685
              pResBlock->info.rows, el, pReader->idStr);
2686
  }
2687

H
Haojun Liao 已提交
2688
  return code;
2689 2690 2691 2692
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2693 2694 2695 2696 2697 2698 2699 2700
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2701 2702
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2703 2704 2705
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2706

2707
  int32_t code = 0;
2708 2709
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2710
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2711
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2712
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2713
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2714

H
Haojun Liao 已提交
2715
    if (pIdx != NULL) {
2716
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2717 2718 2719
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2720
    }
2721
  }
2722

2723 2724 2725 2726 2727 2728 2729
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2730 2731
  }

2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2746 2747 2748 2749 2750 2751 2752
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2753 2754
  return code;

2755 2756 2757
_err:
  taosArrayDestroy(pDelData);
  return code;
2758 2759
}

C
Cary Xu 已提交
2760
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2761
  bool asc = ASCENDING_TRAVERSE(pReader->order);
X
Xiaoyu Wang 已提交
2762
  //  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
2763

X
Xiaoyu Wang 已提交
2764
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
2765

X
Xiaoyu Wang 已提交
2766
  bool     hasKey = false, hasIKey = false;
2767
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2768
  if (pRow != NULL) {
2769
    hasKey = true;
2770 2771 2772
    key = TSDBROW_KEY(pRow);
  }

2773 2774 2775 2776
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
2777 2778
  }

2779
  if (hasKey) {
X
Xiaoyu Wang 已提交
2780
    if (hasIKey) {  // has data in mem & imem
2781 2782
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
X
Xiaoyu Wang 已提交
2783 2784
      } else {
        return key.ts <= ikey.ts ? ikey : key;
2785 2786 2787
      }
    } else {  // no data in imem
      return key;
2788
    }
2789 2790 2791 2792
  } else {
    // no data in mem & imem, return the initial value
    // only imem has data, return ikey
    return ikey;
2793 2794 2795
  }
}

2796
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2797
  SReaderStatus* pStatus = &pReader->status;
2798
  pBlockNum->numOfBlocks = 0;
2799
  pBlockNum->numOfLastFiles = 0;
2800

2801
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2802
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2803 2804

  while (1) {
D
dapan1121 已提交
2805 2806 2807 2808 2809 2810 2811
    bool hasNext = false;
    int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
    if (code) {
      taosArrayDestroy(pIndexList);
      return code;
    }
    
2812
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2813 2814 2815
      break;
    }

H
Haojun Liao 已提交
2816
    taosArrayClear(pIndexList);
D
dapan1121 已提交
2817
    code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
H
Haojun Liao 已提交
2818
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2819
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2820 2821 2822
      return code;
    }

H
Hongze Cheng 已提交
2823
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2824
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2825
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2826
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2827 2828 2829
        return code;
      }

2830
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2831 2832 2833
        break;
      }
    }
2834

H
Haojun Liao 已提交
2835 2836 2837
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2838
  taosArrayDestroy(pIndexList);
2839

H
Haojun Liao 已提交
2840 2841 2842 2843 2844 2845 2846
  if (pReader->pReadSnap != NULL) {
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2847

H
Haojun Liao 已提交
2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2859 2860 2861
    }
  }

H
Haojun Liao 已提交
2862 2863 2864
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2865
static void resetTableListIndex(SReaderStatus* pStatus) {
2866
  STableUidList* pList = &pStatus->uidList;
2867

H
Haojun Liao 已提交
2868 2869 2870
  pList->currentIndex = 0;
  uint64_t uid = pList->tableUidList[0];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2871 2872
}

2873
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
2874 2875 2876 2877 2878 2879 2880 2881
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2882
  return (pStatus->pTableIter != NULL);
2883 2884
}

2885
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2886
  SReaderStatus*    pStatus = &pReader->status;
2887
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
2888
  STableUidList*    pUidList = &pStatus->uidList;
2889

H
Haojun Liao 已提交
2890 2891
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
    return TSDB_CODE_SUCCESS;
2892
  }
2893

2894 2895
  SSDataBlock* pResBlock = pReader->pResBlock;

2896
  while (1) {
2897
    // load the last data block of current table
H
Hongze Cheng 已提交
2898
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2899 2900

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2901
    if (!hasVal) {
2902
      bool hasNexTable = moveToNextTable(pUidList, pStatus);
2903
      if (!hasNexTable) {
2904 2905
        return TSDB_CODE_SUCCESS;
      }
2906

2907
      continue;
2908 2909
    }

2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2923 2924
    }

2925 2926 2927 2928 2929 2930 2931 2932
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2933 2934
      return TSDB_CODE_SUCCESS;
    }
2935

2936
    // current table is exhausted, let's try next table
2937
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
2938
    if (!hasNexTable) {
2939 2940
      return TSDB_CODE_SUCCESS;
    }
2941 2942 2943
  }
}

2944
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2945 2946
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2947 2948 2949

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2950 2951 2952
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2953

H
Haojun Liao 已提交
2954 2955
  ASSERT(pBlockInfo != NULL);

2956
  pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2957
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2958
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2959 2960 2961 2962
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2963
  pBlock = getCurrentBlock(pBlockIter);
2964

2965
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
C
Cary Xu 已提交
2966
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2967

2968
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2969
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2970 2971
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2972 2973 2974
    }

    // build composed data block
2975
    code = buildComposedDataBlock(pReader);
C
Cary Xu 已提交
2976
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2977
    // data in memory that are earlier than current file block
2978
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2979
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2980
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2981 2982 2983 2984
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2985
      ASSERT(tsLast >= pBlock->maxKey.ts);
2986

2987 2988 2989 2990
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
2991
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
X
Xiaoyu Wang 已提交
3014
                  " rows:%d, elapsed time:%.2f ms %s",
3015 3016 3017
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
3018
    } else {  // whole block is required, return it directly
3019 3020
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
3021
      pInfo->id.uid = pScanInfo->uid;
3022
      pInfo->dataLoad = 0;
3023 3024 3025
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
3026

3027
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
3028
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
X
Xiaoyu Wang 已提交
3029 3030
      tsdbDebug("%p uid:%" PRIu64
                " clean file block retrieved from file, global index:%d, "
H
Haojun Liao 已提交
3031 3032 3033
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
3034
    }
3035 3036 3037 3038 3039
  }

  return code;
}

H
Haojun Liao 已提交
3040
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
3041
  SReaderStatus* pStatus = &pReader->status;
3042
  STableUidList* pUidList = &pStatus->uidList;
3043

3044
  while (1) {
X
Xiaoyu Wang 已提交
3045 3046 3047 3048 3049 3050
    //    if (pStatus->pTableIter == NULL) {
    //      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
    //      if (pStatus->pTableIter == NULL) {
    //        return TSDB_CODE_SUCCESS;
    //      }
    //    }
3051

3052 3053
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
3054

3055
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
3056
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
3057 3058 3059 3060
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3061
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
3062
      return TSDB_CODE_SUCCESS;
3063 3064
    }

3065 3066 3067
    // current table is exhausted, let's try next table
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
    if (!hasNexTable) {
H
Haojun Liao 已提交
3068
      return TSDB_CODE_SUCCESS;
3069 3070 3071 3072
    }
  }
}

3073
// set the correct start position in case of the first/last file block, according to the query time window
3074
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3075 3076 3077 3078 3079 3080 3081 3082
  int64_t             lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (pScanInfo) {
      lastKey = pScanInfo->lastKey;
    }
3083
  }
3084 3085 3086
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
3087 3088 3089

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3090
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3091
  pDumpInfo->lastKey = lastKey;
3092 3093
}

3094
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3095
  SBlockNumber num = {0};
X
Xiaoyu Wang 已提交
3096
  int32_t      code = moveToNextFile(pReader, &num);
3097 3098 3099 3100 3101
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3102
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3103 3104 3105 3106 3107
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3108 3109
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3110
  } else {  // no block data, only last block exists
3111
    tBlockDataReset(&pReader->status.fileBlockData);
3112
    resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3113
    resetTableListIndex(&pReader->status);
3114
  }
3115 3116

  // set the correct start position according to the query time window
3117
  initBlockDumpInfo(pReader, pBlockIter);
3118 3119 3120
  return code;
}

3121
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3122 3123
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3124 3125
}

3126
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3127
  int32_t code = TSDB_CODE_SUCCESS;
3128 3129
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3130 3131
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3132
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
3133
  _begin:
3134 3135 3136 3137 3138
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3139 3140 3141 3142
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3143
    // all data blocks are checked in this last block file, now let's try the next file
3144 3145 3146 3147 3148 3149 3150 3151
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3152
      // this file does not have data files, let's start check the last block file if exists
3153
      if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3154
        resetTableListIndex(&pReader->status);
3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3169
  while (1) {
3170 3171
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3172
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3173
      code = buildComposedDataBlock(pReader);
3174 3175 3176 3177
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3178
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3179 3180
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3181
        } else {
H
Haojun Liao 已提交
3182
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3183
            // data blocks in current file are exhausted, let's try the next file now
H
Haojun Liao 已提交
3184 3185 3186 3187 3188 3189
            SBlockData* pBlockData = &pReader->status.fileBlockData;
            if (pBlockData->uid != 0) {
              tBlockDataClear(pBlockData);
            }

            tBlockDataReset(pBlockData);
3190
            resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3191
            resetTableListIndex(&pReader->status);
3192 3193 3194
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3195

3196 3197 3198 3199
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3200

3201 3202
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3203
              resetTableListIndex(&pReader->status);
3204 3205
              goto _begin;
            }
3206
          }
3207
        }
H
Haojun Liao 已提交
3208
      }
3209 3210

      code = doBuildDataBlock(pReader);
3211 3212
    }

3213 3214 3215 3216 3217 3218 3219 3220
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3221
}
H
refact  
Hongze Cheng 已提交
3222

3223 3224
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3225
  if (VND_IS_RSMA(pVnode)) {
3226
    int8_t  level = 0;
3227 3228
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3229 3230 3231
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3232

3233
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3234 3235 3236 3237 3238 3239 3240
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3241
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3242 3243 3244 3245 3246
        break;
      }
      ++level;
    }

3247
    const char* str = (idStr != NULL) ? idStr : "";
3248 3249

    if (level == TSDB_RETENTION_L0) {
3250
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3251
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3252 3253
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3254
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3255
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3256 3257
      return VND_RSMA1(pVnode);
    } else {
3258
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3259
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3260 3261 3262 3263 3264 3265 3266
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3267
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3268
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3269 3270

  int64_t endVer = 0;
L
Liu Jicong 已提交
3271 3272
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3273 3274
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3275
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3276 3277
  }

H
Haojun Liao 已提交
3278
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3279 3280
}

3281
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3282 3283 3284
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3285

L
Liu Jicong 已提交
3286 3287 3288
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3289

3290 3291 3292 3293 3294 3295
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3296
        return false;
3297 3298
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
3299 3300
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
                prev->version >= pVerRange->minVer);
3301 3302
      }
    } else {
3303 3304 3305 3306 3307 3308 3309
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3310 3311
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3327 3328
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3329 3330 3331 3332 3333 3334
            return true;
          }
        }
      }

      return false;
3335 3336
    }
  } else {
3337 3338
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3339

3340 3341 3342 3343 3344 3345 3346
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3347
    } else {
3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3375 3376 3377 3378 3379
      }

      return false;
    }
  }
3380 3381

  return false;
3382 3383
}

3384
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3385
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3386 3387
    return NULL;
  }
H
Hongze Cheng 已提交
3388

3389
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3390 3391
  TSDBKEY  key = TSDBROW_KEY(pRow);

3392
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3393
    pIter->hasVal = false;
H
Haojun Liao 已提交
3394 3395
    return NULL;
  }
H
Hongze Cheng 已提交
3396

3397
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3398
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3399
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3400 3401
    return pRow;
  }
H
Hongze Cheng 已提交
3402

3403
  while (1) {
3404 3405
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3406 3407
      return NULL;
    }
H
Hongze Cheng 已提交
3408

3409
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3410

H
Haojun Liao 已提交
3411
    key = TSDBROW_KEY(pRow);
3412
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3413
      pIter->hasVal = false;
H
Haojun Liao 已提交
3414 3415
      return NULL;
    }
H
Hongze Cheng 已提交
3416

dengyihao's avatar
dengyihao 已提交
3417
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3418
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3419 3420 3421 3422
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3423

3424 3425
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3426
  while (1) {
3427 3428
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3429 3430
      break;
    }
H
Hongze Cheng 已提交
3431

3432
    // data exists but not valid
3433
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3434 3435 3436 3437 3438
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3439
    TSDBKEY k = TSDBROW_KEY(pRow);
3440
    if (k.ts != ts) {
H
Haojun Liao 已提交
3441 3442 3443
      break;
    }

3444 3445 3446 3447 3448
    if (pRow->type == TSDBROW_ROW_FMT) {
      STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
      if (pTSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
3449

3450 3451 3452 3453
      tsdbRowMergerAdd(pMerger, pRow, pTSchema);
    } else {  // column format
      tsdbRowMerge(pMerger, pRow);
    }
H
Haojun Liao 已提交
3454 3455 3456 3457 3458
  }

  return TSDB_CODE_SUCCESS;
}

3459
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3460
                                          SVersionRange* pVerRange, int32_t step) {
3461
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3462
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3463
      rowIndex += step;
3464 3465 3466 3467
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
H
Hongze Cheng 已提交
3468
    tsdbRowMerge(pMerger, &fRow);
3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3480
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3481 3482
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3483
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3484
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3485

3486
  *state = CHECK_FILEBLOCK_QUIT;
3487
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3488

3489
  bool    loadNeighbor = true;
H
Haojun Liao 已提交
3490
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3491

H
Haojun Liao 已提交
3492
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3493 3494
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3495
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3496 3497 3498 3499
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3500
  return code;
3501 3502
}

3503 3504
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3505 3506
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3507
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3508
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3509
  int32_t step = asc ? 1 : -1;
3510

3511
  pDumpInfo->rowIndex += step;
3512
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3513 3514 3515
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3516

3517 3518 3519 3520
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3521

3522
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3523
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
H
Haojun Liao 已提交
3524 3525 3526 3527 3528
      if (pFileBlockInfo == NULL) {
        st = CHECK_FILEBLOCK_QUIT;
        break;
      }

3529 3530 3531
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3532
      }
3533
    }
H
Haojun Liao 已提交
3534
  }
3535

H
Haojun Liao 已提交
3536 3537 3538
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3539
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3540 3541
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3542 3543
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3544
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
3545
      tsdbRowMerge(pMerger, &fRow1);
3546 3547 3548 3549 3550 3551 3552 3553
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3554
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
3555
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3556
  TSDBROW* pNextRow = NULL;
3557
  TSDBROW  current = *pRow;
3558

3559 3560
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3561

3562
    if (!pIter->hasVal) {
3563
      *pResRow = *pRow;
3564
      *freeTSRow = false;
3565
      return TSDB_CODE_SUCCESS;
3566
    } else {  // has next point in mem/imem
3567
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3568
      if (pNextRow == NULL) {
H
Haojun Liao 已提交
3569
        *pResRow = current;
3570
        *freeTSRow = false;
3571
        return TSDB_CODE_SUCCESS;
3572 3573
      }

H
Hongze Cheng 已提交
3574
      if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
H
Haojun Liao 已提交
3575
        *pResRow = current;
3576
        *freeTSRow = false;
3577
        return TSDB_CODE_SUCCESS;
3578
      }
3579
    }
3580 3581
  }

3582
  SRowMerger merge = {0};
H
Haojun Liao 已提交
3583
  terrno = 0;
3584
  int32_t code = 0;
H
Haojun Liao 已提交
3585

3586 3587 3588 3589 3590 3591 3592
  // start to merge duplicated rows
  if (current.type == TSDBROW_ROW_FMT) {
    // get the correct schema for data in memory
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
    if (pTSchema == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3593

3594 3595 3596
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
H
Haojun Liao 已提交
3597

3598
    code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
3599 3600 3601
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3602

3603 3604 3605 3606
    STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
    if (pTSchema1 == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3607

3608
    tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
3609 3610
  } else {  // let's merge rows in file block
    code = tsdbRowMergerInit(&merge, &current, pReader->pSchema);
3611 3612 3613
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3614

3615 3616
    tsdbRowMerge(&merge, pNextRow);
  }
H
Haojun Liao 已提交
3617

wmmhello's avatar
wmmhello 已提交
3618
  code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
H
Haojun Liao 已提交
3619 3620 3621 3622
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3623
  code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
3624 3625 3626
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3627

wmmhello's avatar
wmmhello 已提交
3628
  pResRow->type = TSDBROW_ROW_FMT;
3629
  tsdbRowMergerClear(&merge);
3630
  *freeTSRow = true;
3631

3632
  return TSDB_CODE_SUCCESS;
3633 3634
}

3635
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3636
                           SRow** pTSRow) {
H
Haojun Liao 已提交
3637 3638
  SRowMerger merge = {0};

3639 3640 3641
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3642
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3643
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3644

H
Hongze Cheng 已提交
3645
    int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
3646 3647 3648 3649 3650 3651 3652 3653 3654
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3655

H
Hongze Cheng 已提交
3656
    tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3657 3658 3659 3660 3661 3662
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3663
  } else {
H
Haojun Liao 已提交
3664
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3665

H
Hongze Cheng 已提交
3666
    int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
3667
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3668 3669 3670 3671 3672 3673 3674 3675
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3676

H
Hongze Cheng 已提交
3677
    tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3678 3679 3680 3681 3682
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3683
  }
3684

H
Haojun Liao 已提交
3685 3686
  int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
  tsdbRowMergerClear(&merge);
3687
  return code;
3688 3689
}

3690
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
3691
                            bool* freeTSRow) {
3692 3693
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3694
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3695
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3696

3697 3698
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3699
  if (pBlockScanInfo->iter.hasVal) {
3700 3701 3702 3703 3704 3705
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3706
  if (pBlockScanInfo->iiter.hasVal) {
3707 3708 3709 3710 3711 3712
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3713
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3714
    TSDBKEY k = TSDBROW_KEY(pRow);
3715
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3716

3717
    int32_t code = TSDB_CODE_SUCCESS;
3718 3719
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3720
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
3721
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3722
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
3723
      }
3724
    } else {  // ik.ts == k.ts
3725
      *freeTSRow = true;
3726 3727
      pResRow->type = TSDBROW_ROW_FMT;
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
3728 3729 3730
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3731
    }
3732

3733
    return code;
H
Haojun Liao 已提交
3734 3735
  }

3736
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3737
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
H
Hongze Cheng 已提交
3738
                                    freeTSRow);
H
Haojun Liao 已提交
3739 3740
  }

3741
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3742
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3743 3744 3745 3746 3747
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3748
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3749
  int32_t outputRowIndex = pBlock->info.rows;
3750
  int64_t uid = pScanInfo->uid;
3751 3752 3753

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3754
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3755
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3756

3757
  SColVal colVal = {0};
3758
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3759

3760
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3761
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3762
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3763 3764 3765
    i += 1;
  }

H
Haojun Liao 已提交
3766
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3767
    col_id_t colId = pSupInfo->colId[i];
3768 3769

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3770
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3771

H
Hongze Cheng 已提交
3772
      tRowGet(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3773
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3774 3775 3776
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3777
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3778 3779

      colDataAppendNULL(pColInfoData, outputRowIndex);
3780 3781 3782
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3783
    }
3784 3785
  }

3786
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3787
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3788
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3789
    colDataAppendNULL(pColInfoData, outputRowIndex);
3790 3791 3792
    i += 1;
  }

3793
  pBlock->info.dataLoad = 1;
3794
  pBlock->info.rows += 1;
3795
  pScanInfo->lastKey = pTSRow->ts;
3796 3797 3798
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3799 3800
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3801 3802 3803 3804
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3805
  if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3806
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3807
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3808
    i += 1;
3809 3810 3811
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3812
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3813
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3814

3815
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3816
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3817
    if (pData->cid < pSupInfo->colId[i]) {
3818 3819 3820 3821
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3822 3823
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3824 3825
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3826
      j += 1;
H
Haojun Liao 已提交
3827 3828
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3829 3830 3831 3832 3833 3834 3835
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3836
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3837
    colDataAppendNULL(pCol, outputRowIndex);
3838 3839 3840
    i += 1;
  }

3841
  pResBlock->info.dataLoad = 1;
3842 3843 3844 3845
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3846 3847
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3848 3849 3850
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
3851
    //    SRow* pTSRow = NULL;
3852
    TSDBROW row = {.type = -1};
3853
    bool    freeTSRow = false;
3854 3855
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
    if (row.type == -1) {
3856
      break;
H
Haojun Liao 已提交
3857 3858
    }

3859 3860
    if (row.type == TSDBROW_ROW_FMT) {
      doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
3861

3862 3863 3864 3865 3866
      if (freeTSRow) {
        taosMemoryFree(row.pTSRow);
      }
    } else {
      doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
3867
    }
H
Haojun Liao 已提交
3868 3869

    // no data in buffer, return immediately
3870
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3871 3872 3873
      break;
    }

3874
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3875 3876 3877 3878 3879 3880
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3881

3882 3883
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3884
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3885

3886
  STableBlockScanInfo** p = NULL;
3887
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3888
    clearBlockScanInfo(*p);
3889 3890
  }

3891 3892 3893
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3894
  taosHashClear(pReader->status.pTableMap);
3895
  STableUidList* pUidList = &pReader->status.uidList;
H
Haojun Liao 已提交
3896
  pUidList->currentIndex = 0;
3897

3898 3899
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3900 3901
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
H
Haojun Liao 已提交
3902 3903
    pUidList->tableUidList[i] = pList[i].uid;

3904
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3905 3906
  }

H
Hongze Cheng 已提交
3907 3908 3909
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3910 3911 3912 3913 3914 3915
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3916

dengyihao's avatar
dengyihao 已提交
3917 3918 3919 3920 3921 3922
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3923

H
Hongze Cheng 已提交
3924
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3925

3926
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
3927 3928
  SReaderStatus*  pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
3929

3930 3931
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pStatus->blockIter, pReader->order);
3932

3933 3934 3935
  int32_t code = TSDB_CODE_SUCCESS;
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
3936
  } else {
3937
    code = initForFirstBlockInFile(pReader, pBlockIter);
3938
  }
3939 3940 3941

  if (!pStatus->loadFromFile) {
    resetTableListIndex(pStatus);
3942
  }
3943 3944

  return code;
3945 3946
}

H
refact  
Hongze Cheng 已提交
3947
// ====================================== EXPOSED APIs ======================================
3948
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3949
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3950 3951 3952 3953 3954 3955
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3956 3957 3958
  int32_t capacity = pVnode->config.tsdbCfg.maxRows;
  if (pResBlock != NULL) {
    blockDataEnsureCapacity(pResBlock, capacity);
H
Haojun Liao 已提交
3959 3960 3961
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3962
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3963 3964
    goto _err;
  }
H
Hongze Cheng 已提交
3965

3966
  // check for query time window
H
Haojun Liao 已提交
3967
  STsdbReader* pReader = *ppReader;
3968
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3969 3970 3971
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3972

3973 3974
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3975
    int32_t order = pCond->order;
3976
    if (order == TSDB_ORDER_ASC) {
3977
      pCond->twindows.ekey = window.skey;
3978 3979 3980
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3981
      pCond->twindows.skey = window.ekey;
3982 3983 3984 3985
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3986
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
3987
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
3988 3989 3990 3991 3992
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3993
      pCond->twindows.skey = window.ekey;
3994
      pCond->twindows.ekey = INT64_MAX;
3995
    } else {
3996
      pCond->twindows.skey = INT64_MIN;
3997
      pCond->twindows.ekey = window.ekey;
3998
    }
3999 4000
    pCond->order = order;

H
Haojun Liao 已提交
4001
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
4002 4003 4004 4005 4006
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
4007
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
4008 4009
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
4010
  if (pCond->suid != 0) {
4011
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
4012
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4013
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
4014
    }
4015 4016
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
4017
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
4018
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4019
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
4020
    }
4021 4022
  }

4023
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
4024 4025 4026 4027
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
4028
  }
4029

4030
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
X
Xiaoyu Wang 已提交
4031 4032
  pReader->status.pTableMap =
      createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
H
Haojun Liao 已提交
4033 4034
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
4035
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
4036 4037
    goto _err;
  }
H
Hongze Cheng 已提交
4038

4039
  pReader->suspended = true;
4040

4041
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
4042
  return code;
H
Hongze Cheng 已提交
4043 4044

_err:
H
Haojun Liao 已提交
4045
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
4046
  tsdbReaderClose(pReader);
X
Xiaoyu Wang 已提交
4047
  *ppReader = NULL;  // reset the pointer value.
H
Hongze Cheng 已提交
4048
  return code;
H
refact  
Hongze Cheng 已提交
4049 4050 4051
}

void tsdbReaderClose(STsdbReader* pReader) {
4052 4053
  if (pReader == NULL) {
    return;
4054
  }
H
refact  
Hongze Cheng 已提交
4055

4056
  tsdbAcquireReader(pReader);
4057
  {
H
Haojun Liao 已提交
4058
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
4059
      STsdbReader* p = pReader->innerReader[0];
4060

4061
      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4062
      p->status.uidList.tableUidList = NULL;
4063 4064 4065 4066 4067 4068 4069
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4070
      p->status.uidList.tableUidList = NULL;
4071 4072 4073
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
4074 4075 4076 4077 4078 4079

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

4080
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
4081

4082
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
4083
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
4084 4085 4086 4087
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
4088

H
Haojun Liao 已提交
4089 4090 4091
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
4092

H
Haojun Liao 已提交
4093
  taosMemoryFree(pSupInfo->colId);
H
Hongze Cheng 已提交
4094
  tBlockDataDestroy(&pReader->status.fileBlockData);
4095
  cleanupDataBlockIterator(&pReader->status.blockIter);
4096 4097

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
4098 4099 4100 4101
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }
4102

H
Haojun Liao 已提交
4103 4104 4105
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4106

4107 4108 4109 4110 4111 4112 4113 4114 4115
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

4116
  qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
4117
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
4118
  pReader->pReadSnap = NULL;
4119

4120 4121
  tsdbReleaseReader(pReader);

4122
  tsdbUninitReaderLock(pReader);
4123

4124
  taosMemoryFree(pReader->status.uidList.tableUidList);
H
Haojun Liao 已提交
4125
  SIOCostSummary* pCost = &pReader->cost;
4126

H
Haojun Liao 已提交
4127 4128
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4129 4130
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4131

H
Haojun Liao 已提交
4132
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
H
refact  
Hongze Cheng 已提交
4133

H
Haojun Liao 已提交
4134 4135 4136
    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4137

4138 4139 4140 4141 4142
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
X
Xiaoyu Wang 已提交
4143 4144
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
      "ms, %s",
4145 4146 4147
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
H
Haojun Liao 已提交
4148
      pCost->initDelSkylineIterTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4149

4150 4151
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4152

4153 4154 4155
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
4156

4157
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4158 4159
}

4160 4161 4162 4163 4164 4165 4166 4167 4168 4169
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
  int32_t code = 0;

  // save reader's base state & reset top state to be reconstructed from base state
  SReaderStatus*       pStatus = &pReader->status;
  STableBlockScanInfo* pBlockScanInfo = NULL;

  if (pStatus->loadFromFile) {
    SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
    if (pBlockInfo != NULL) {
4170 4171
      pBlockScanInfo =
          *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
4172 4173 4174 4175 4176 4177 4178
      if (pBlockScanInfo == NULL) {
        code = TSDB_CODE_INVALID_PARA;
        tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                  taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
        goto _err;
      }
    } else {
4179
      pBlockScanInfo = *pStatus->pTableIter;
4180 4181 4182 4183 4184
    }

    tsdbDataFReaderClose(&pReader->pFileReader);

    // resetDataBlockScanInfo excluding lastKey
4185
    STableBlockScanInfo** p = NULL;
4186 4187

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
4188 4189 4190 4191 4192 4193 4194 4195
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
4196 4197
      }

4198 4199 4200 4201 4202 4203
      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
4204 4205
    }
  } else {
4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227
    // resetDataBlockScanInfo excluding lastKey
    STableBlockScanInfo** p = NULL;

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
      }

      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
    }

4228
    pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
4229 4230 4231 4232 4233 4234 4235
    if (pBlockScanInfo) {
      // save lastKey to restore memory iterator
      STimeWindow w = pReader->pResBlock->info.window;
      pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;

      // reset current current table's data block scan info,
      pBlockScanInfo->iterInit = false;
4236 4237
      pBlockScanInfo->iter.hasVal = false;
      pBlockScanInfo->iiter.hasVal = false;
4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252
      if (pBlockScanInfo->iter.iter != NULL) {
        pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
      }

      if (pBlockScanInfo->iiter.iter != NULL) {
        pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
      }

      pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
      tMapDataClear(&pBlockScanInfo->mapData);
      // TODO: keep skyline for reuse
      pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
    }
  }

4253
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
4254
  pReader->pReadSnap = NULL;
4255 4256 4257

  pReader->suspended = true;

4258 4259
  tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
            pReader->idStr);
4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270
  return code;

_err:
  tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

static int32_t tsdbSetQueryReseek(void* pQHandle) {
  int32_t      code = 0;
  STsdbReader* pReader = pQHandle;

4271
  code = tsdbTryAcquireReader(pReader);
4272 4273
  if (code == 0) {
    if (pReader->suspended) {
4274
      tsdbReleaseReader(pReader);
4275 4276 4277 4278
      return code;
    }

    tsdbReaderSuspend(pReader);
4279

4280
    tsdbReleaseReader(pReader);
4281

4282
    return code;
4283 4284 4285
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
4286 4287
    terrno = TAOS_SYSTEM_ERROR(code);
    return TSDB_CODE_FAILED;
4288 4289 4290 4291 4292 4293
  }
}

int32_t tsdbReaderResume(STsdbReader* pReader) {
  int32_t code = 0;

4294
  STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
4295 4296 4297

  //  restore reader's state
  //  task snapshot
4298
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
4299
  if (numOfTables > 0) {
4300
    qTrace("tsdb/reader: %p, take snapshot", pReader);
4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317
    code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    } else {
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];

      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4318
      pPrevReader->status.uidList = pReader->status.uidList;
4319 4320 4321 4322 4323 4324
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4325
      pNextReader->status.uidList = pReader->status.uidList;
4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;

      code = doOpenReaderImpl(pPrevReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

  pReader->suspended = false;

4339 4340
  tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
            pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
4341 4342 4343 4344 4345 4346 4347
  return code;

_err:
  tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

4348
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4349
  // cleanup the data that belongs to the previous data block
4350 4351
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4352

4353
  SReaderStatus* pStatus = &pReader->status;
4354
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
4355 4356
    return false;
  }
H
Haojun Liao 已提交
4357

4358 4359 4360 4361 4362
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4363

4364 4365 4366
    if (pBlock->info.rows > 0) {
      return true;
    } else {
4367
      resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4368
      buildBlockFromBufferSequentially(pReader);
4369
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4370
    }
4371 4372 4373
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4374
  }
H
refact  
Hongze Cheng 已提交
4375 4376
}

4377
bool tsdbNextDataBlock(STsdbReader* pReader) {
4378
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
4379 4380 4381
    return false;
  }

4382 4383
  SReaderStatus* pStatus = &pReader->status;

4384 4385 4386
  int32_t code = tsdbAcquireReader(pReader);
  qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

4387 4388 4389 4390
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4391
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4392
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4393
    pReader->step = EXTERNAL_ROWS_PREV;
4394
    if (ret) {
4395
      pStatus = &pReader->innerReader[0]->status;
4396
      if (pStatus->composedDataBlock) {
4397
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4398
        tsdbReleaseReader(pReader);
4399 4400
      }

4401 4402
      return ret;
    }
4403
  }
4404

4405
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4406 4407
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4408
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4409 4410 4411 4412 4413

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4414
    pReader->step = EXTERNAL_ROWS_MAIN;
4415 4416 4417 4418
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
4419
    if (pStatus->composedDataBlock) {
4420
      qTrace("tsdb/read: %p, unlock read mutex", pReader);
4421
      tsdbReleaseReader(pReader);
4422 4423
    }

4424 4425 4426
    return ret;
  }

4427
  if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
4428 4429
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4430
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4431 4432 4433 4434
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4435
    ret = doTsdbNextDataBlock(pReader->innerReader[1]);
4436
    pReader->step = EXTERNAL_ROWS_NEXT;
4437
    if (ret) {
4438
      pStatus = &pReader->innerReader[1]->status;
4439
      if (pStatus->composedDataBlock) {
4440
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4441
        tsdbReleaseReader(pReader);
4442 4443
      }

4444
      return ret;
4445 4446 4447
    }
  }

4448
  qTrace("tsdb/read: %p, unlock read mutex", pReader);
4449
  tsdbReleaseReader(pReader);
4450

4451 4452 4453
  return false;
}

4454
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
4455 4456
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
4457
  int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
4458
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4470
        taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
4471 4472 4473 4474 4475 4476
      }
      j += 1;
    }
  }
}

4477
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4478 4479
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4480
  int32_t code = 0;
4481
  *allHave = false;
H
Haojun Liao 已提交
4482
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4483

4484
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4485 4486 4487
    return TSDB_CODE_SUCCESS;
  }

4488
  // there is no statistics data for composed block
4489
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4490 4491
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4492

4493
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4494 4495
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4496 4497 4498
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4499 4500

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4501
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4502
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4503
    if (code != TSDB_CODE_SUCCESS) {
4504 4505
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4506 4507
      return code;
    }
4508
  } else {
H
Haojun Liao 已提交
4509
    *pBlockSMA = NULL;
4510
    return TSDB_CODE_SUCCESS;
4511
  }
H
Hongze Cheng 已提交
4512

4513
  *allHave = true;
H
Hongze Cheng 已提交
4514

4515 4516
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4517

4518 4519
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4520 4521 4522 4523
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4524
  size_t numOfCols = pSup->numOfCols;
4525

4526
  // ensure capacity
H
Haojun Liao 已提交
4527 4528 4529
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4530 4531
  }

4532 4533 4534
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
H
Haojun Liao 已提交
4535
    pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
4536
  }
4537

4538
  // do fill all null column value SMA info
4539
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4540
  size_t size = taosArrayGetSize(pSup->pColAgg);
4541

H
Haojun Liao 已提交
4542
  int32_t i = 0, j = 0;
4543
  while (j < numOfCols && i < size) {
4544
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4545 4546
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4547 4548
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4549
    } else if (pAgg->colId < pSup->colId[j]) {
4550
      i += 1;
H
Haojun Liao 已提交
4551
    } else if (pSup->colId[j] < pAgg->colId) {
H
Haojun Liao 已提交
4552
      // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
4553
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4554 4555 4556 4557
      j += 1;
    }
  }

H
Haojun Liao 已提交
4558
  *pBlockSMA = pResBlock->pBlockAgg;
4559
  pReader->cost.smaDataLoad += 1;
4560

4561
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4562
  return code;
H
Hongze Cheng 已提交
4563 4564
}

H
Haojun Liao 已提交
4565
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
4566
  SReaderStatus*       pStatus = &pReader->status;
H
Haojun Liao 已提交
4567
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4568 4569
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4570 4571 4572 4573
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
4574
    return NULL;
4575 4576
  }

4577
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4578
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4579
    tBlockDataDestroy(&pStatus->fileBlockData);
4580 4581
    terrno = code;
    return NULL;
4582
  }
4583

4584
  copyBlockDataToSDataBlock(pReader);
H
Haojun Liao 已提交
4585
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4586 4587
}

H
Haojun Liao 已提交
4588
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4589
  STsdbReader* pTReader = pReader;
4590 4591
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
4592
      pTReader = pReader->innerReader[0];
4593
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
4594
      pTReader = pReader->innerReader[1];
4595 4596 4597
    }
  }

4598 4599 4600 4601 4602 4603 4604
  SReaderStatus* pStatus = &pTReader->status;
  if (pStatus->composedDataBlock) {
    return pTReader->pResBlock;
  }

  SSDataBlock* ret = doRetrieveDataBlock(pTReader);

4605
  qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
4606
  tsdbReleaseReader(pReader);
4607 4608

  return ret;
4609 4610
}

H
Haojun Liao 已提交
4611
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
4612
  qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
4613
  tsdbAcquireReader(pReader);
L
Liu Jicong 已提交
4614 4615 4616 4617 4618

  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

H
Haojun Liao 已提交
4619
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
L
Liu Jicong 已提交
4620
    tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
4621

4622
    tsdbReleaseReader(pReader);
4623

4624 4625
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4626

H
Haojun Liao 已提交
4627 4628 4629
  SReaderStatus* pStatus = &pReader->status;

  SDataBlockIter* pBlockIter = &pStatus->blockIter;
4630

L
Liu Jicong 已提交
4631
  pReader->order = pCond->order;
4632
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
H
Haojun Liao 已提交
4633 4634
  pStatus->loadFromFile = true;
  pStatus->pTableIter = NULL;
H
Haojun Liao 已提交
4635
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4636

4637
  // allocate buffer in order to load data blocks from file
4638
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4639

4640
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4641
  tsdbDataFReaderClose(&pReader->pFileReader);
4642

H
Haojun Liao 已提交
4643
  int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
L
Liu Jicong 已提交
4644

H
Haojun Liao 已提交
4645
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4646
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4647
  resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4648

H
Hongze Cheng 已提交
4649
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
4650
  resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
4651

4652
  int32_t code = 0;
4653

4654
  // no data in files, let's try buffer in memory
H
Haojun Liao 已提交
4655 4656
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
4657
    resetTableListIndex(pStatus);
4658 4659 4660
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4661 4662
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4663

4664
      tsdbReleaseReader(pReader);
4665

4666 4667 4668
      return code;
    }
  }
H
Hongze Cheng 已提交
4669

H
Hongze Cheng 已提交
4670 4671 4672 4673
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4674

4675
  tsdbReleaseReader(pReader);
4676

4677
  return code;
H
Hongze Cheng 已提交
4678
}
H
Hongze Cheng 已提交
4679

4680 4681 4682
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4683

4684 4685 4686 4687
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
4688
  pTableBlockInfo->numOfVgroups = 1;
H
Hongze Cheng 已提交
4689

4690 4691
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4692

4693 4694 4695
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4696

4697
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4698

4699
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4700

4701 4702
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4703

4704 4705
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4706

4707 4708
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4709
  }
H
Hongze Cheng 已提交
4710

4711
  pTableBlockInfo->numOfTables = numOfTables;
4712
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4713

4714 4715
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4716
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4717

4718 4719
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4720

4721 4722 4723
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4724

4725 4726 4727
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4728

4729 4730 4731
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4732

4733
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4734

4735 4736
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4737

H
Haojun Liao 已提交
4738
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4739 4740
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
H
Haojun Liao 已提交
4741
      if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
4742 4743
        break;
      }
H
refact  
Hongze Cheng 已提交
4744

4745 4746
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4747
    }
H
refact  
Hongze Cheng 已提交
4748

H
Hongze Cheng 已提交
4749 4750
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4751
  }
H
Hongze Cheng 已提交
4752

H
refact  
Hongze Cheng 已提交
4753 4754
  return code;
}
H
Hongze Cheng 已提交
4755

H
refact  
Hongze Cheng 已提交
4756
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4757
  int64_t rows = 0;
H
Hongze Cheng 已提交
4758

4759
  SReaderStatus* pStatus = &pReader->status;
4760
  tsdbAcquireReader(pReader);
4761 4762 4763 4764
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4765
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4766

4767
  while (pStatus->pTableIter != NULL) {
4768
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4769 4770

    STbData* d = NULL;
4771
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
4772
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4773 4774 4775 4776 4777 4778
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
4779
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
4780
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4781 4782 4783 4784 4785 4786 4787 4788
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4789

4790
  tsdbReleaseReader(pReader);
4791

H
refact  
Hongze Cheng 已提交
4792
  return rows;
H
Hongze Cheng 已提交
4793
}
D
dapan1121 已提交
4794

L
Liu Jicong 已提交
4795
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4796 4797 4798 4799
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4800
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4801 4802 4803 4804 4805 4806 4807
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4808

D
dapan1121 已提交
4809
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4810
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4811
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4812
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4813 4814 4815 4816 4817 4818
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4819
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4820
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4821 4822 4823 4824
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4825 4826 4827
  }

  metaReaderClear(&mr);
4828
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4829

D
dapan1121 已提交
4830 4831
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4832

H
Hongze Cheng 已提交
4833
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
H
Hongze Cheng 已提交
4834 4835 4836
  int32_t        code = 0;
  STsdb*         pTsdb = pReader->pTsdb;
  SVersionRange* pRange = &pReader->verRange;
H
Hongze Cheng 已提交
4837 4838

  // alloc
H
Hongze Cheng 已提交
4839 4840
  STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
  if (pSnap == NULL) {
H
Hongze Cheng 已提交
4841 4842 4843 4844 4845
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
H
Hongze Cheng 已提交
4846
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4847 4848

  // take snapshot
H
Hongze Cheng 已提交
4849
  if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
H
Hongze Cheng 已提交
4850 4851 4852 4853 4854 4855 4856 4857 4858 4859 4860
    pSnap->pMem = pTsdb->mem;
    pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
    if (pSnap->pNode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pNode->pQHandle = pReader;
    pSnap->pNode->reseek = reseek;

    tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
H
Hongze Cheng 已提交
4861 4862
  }

H
Hongze Cheng 已提交
4863
  if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
H
Hongze Cheng 已提交
4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874
    pSnap->pIMem = pTsdb->imem;
    pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
    if (pSnap->pINode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pINode->pQHandle = pReader;
    pSnap->pINode->reseek = reseek;

    tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
H
Hongze Cheng 已提交
4875 4876
  }

H
Hongze Cheng 已提交
4877
  // fs
H
Hongze Cheng 已提交
4878
  code = tsdbFSRef(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4879 4880 4881 4882
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4883 4884

  // unlock
H
Hongze Cheng 已提交
4885
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4886

4887
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4888

H
Hongze Cheng 已提交
4889
_exit:
H
Hongze Cheng 已提交
4890 4891 4892 4893 4894 4895 4896 4897 4898 4899
  if (code) {
    *ppSnap = NULL;
    if (pSnap) {
      if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
      if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
      taosMemoryFree(pSnap);
    }
  } else {
    *ppSnap = pSnap;
  }
H
Hongze Cheng 已提交
4900 4901 4902
  return code;
}

4903
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) {
H
Hongze Cheng 已提交
4904 4905
  STsdb* pTsdb = pReader->pTsdb;

H
Hongze Cheng 已提交
4906 4907
  if (pSnap) {
    if (pSnap->pMem) {
4908
      tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
H
Hongze Cheng 已提交
4909 4910 4911
    }

    if (pSnap->pIMem) {
4912
      tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
H
Hongze Cheng 已提交
4913 4914
    }

H
Hongze Cheng 已提交
4915
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4916 4917
    if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
    if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
H
Hongze Cheng 已提交
4918
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4919
  }
4920
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4921
}