tsdbRead.c 163.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
X
Xiaoyu Wang 已提交
82 83 84
  //  double  getTbFromMemTime;
  //  double  getTbFromIMemTime;
  double initDelSkylineIterTime;
H
Hongze Cheng 已提交
85 86 87
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
88 89 90 91 92 93 94
  SArray*        pColAgg;
  SColumnDataAgg tsColAgg;
  int16_t*       colId;
  int16_t*       slotId;
  int32_t        numOfCols;
  char**         buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
  bool           smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
95 96
} SBlockLoadSuppInfo;

97
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
98 99 100 101 102
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
103
  SSttBlockLoadInfo* pInfo;
104 105
} SLastBlockReader;

106
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
107 108 109
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
110
  int32_t           order;
H
Hongze Cheng 已提交
111
  SLastBlockReader* pLastBlockReader;  // last file block reader
112
} SFilesetIter;
H
Haojun Liao 已提交
113 114

typedef struct SFileDataBlockInfo {
115
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
116
  uint64_t uid;
117
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
118 119 120
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
121
  int32_t   numOfBlocks;
122
  int32_t   index;
H
Hongze Cheng 已提交
123
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
124
  int32_t   order;
H
Hongze Cheng 已提交
125
  SDataBlk  block;  // current SDataBlk data
126
  SHashObj* pTableMap;
H
Haojun Liao 已提交
127 128 129
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
130 131 132 133
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
134 135
} SFileBlockDumpInfo;

136
typedef struct STableUidList {
137 138
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
139
} STableUidList;
140

H
Haojun Liao 已提交
141
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
142 143 144
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
145
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
146
  STableUidList         uidList;            // check tables in uid order, to avoid the repeatly load of blocks in STT.
H
Hongze Cheng 已提交
147 148 149 150 151
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
152 153
} SReaderStatus;

154
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
155 156 157
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
158 159
} SBlockInfoBuf;

H
Hongze Cheng 已提交
160
struct STsdbReader {
H
Haojun Liao 已提交
161
  STsdb*             pTsdb;
162 163 164
  SVersionRange      verRange;
  TdThreadMutex      readerMutex;
  bool               suspended;
H
Haojun Liao 已提交
165 166
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
167
  bool               freeBlock;
H
Haojun Liao 已提交
168 169 170 171
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
172 173
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
174
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
175
  STsdbReadSnap*     pReadSnap;
176
  SIOCostSummary     cost;
177 178 179 180 181
  STSchema*          pSchema;      // the newest version schema
  STSchema*          pMemSchema;   // the previous schema for in-memory data, to avoid load schema too many times
  SDataFReader*      pFileReader;  // the file reader
  SDelFReader*       pDelFReader;  // the del file reader
  SArray*            pDelIdx;      // del file block index;
182 183 184
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
185
};
H
Hongze Cheng 已提交
186

H
Haojun Liao 已提交
187
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
188 189
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
190
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
191 192
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
193
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
194
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
195
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
196
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
197
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
H
Hongze Cheng 已提交
198
                                     STableBlockScanInfo* pInfo);
199
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
200
                                         int32_t rowIndex);
201
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
202 203
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
204

H
Hongze Cheng 已提交
205
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
206
                                        TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
H
Hongze Cheng 已提交
207
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
H
Hongze Cheng 已提交
208
                                  STsdbReader* pReader, SRow** pTSRow);
209 210
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
211

dengyihao's avatar
dengyihao 已提交
212 213 214 215
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
216
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
217 218 219
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
C
Cary Xu 已提交
220
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
221
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
222
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
223
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
C
Cary Xu 已提交
224

225 226
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);

C
Cary Xu 已提交
227
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
228

229 230
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
                                   int32_t numOfCols) {
231
  pSupInfo->smaValid = true;
232
  pSupInfo->numOfCols = numOfCols;
233
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
H
Haojun Liao 已提交
234 235
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
236 237
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
238

H
Haojun Liao 已提交
239
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
240
  pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
241
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
242 243
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
244

H
Haojun Liao 已提交
245 246
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
247 248
    } else {
      pSupInfo->buildBuf[i] = NULL;
249
    }
H
Haojun Liao 已提交
250
  }
H
Hongze Cheng 已提交
251

H
Haojun Liao 已提交
252 253
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
254

H
Haojun Liao 已提交
255
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
256 257
  int32_t i = 0, j = 0;

H
Hongze Cheng 已提交
258
  while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
259
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
260
    if (pTCol->colId == pSupInfo->colId[j]) {
261 262
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
263
        return TSDB_CODE_SUCCESS;
264 265 266 267
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
268
    } else if (pTCol->colId < pSupInfo->colId[j]) {
269 270 271
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
272
      return TSDB_CODE_INVALID_PARA;
273 274
    }
  }
H
Haojun Liao 已提交
275 276

  return TSDB_CODE_SUCCESS;
277 278
}

279
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
280
  int32_t num = numOfTables / pBuf->numPerBucket;
281 282 283 284 285
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
286
  for (int32_t i = 0; i < num; ++i) {
287 288 289 290
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
291

292 293 294 295 296 297 298
    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
299
    }
300
    taosArrayPush(pBuf->pData, &p);
H
Haojun Liao 已提交
301
  }
H
Hongze Cheng 已提交
302

H
Haojun Liao 已提交
303 304
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
305

306 307
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
308
  for (int32_t i = 0; i < num; ++i) {
309 310 311 312 313 314 315 316 317
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
318
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
319 320 321
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

H
Haojun Liao 已提交
322 323 324 325 326 327 328 329 330 331
static int32_t uidComparFunc(const void* p1, const void* p2) {
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
  if (pu1 == pu2) {
    return 0;
  } else {
    return (pu1 < pu2) ? -1 : 1;
  }
}

332
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Hongze Cheng 已提交
333
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
X
Xiaoyu Wang 已提交
334
                                         STableUidList* pUidList, int32_t numOfTables) {
H
Haojun Liao 已提交
335
  // allocate buffer in order to load data blocks from file
336
  // todo use simple hash instead, optimize the memory consumption
337 338 339
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
340 341 342
    return NULL;
  }

H
Haojun Liao 已提交
343
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
344
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
345

H
Haojun Liao 已提交
346 347
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
  if (pUidList->tableUidList == NULL) {
H
Haojun Liao 已提交
348 349
    return NULL;
  }
H
Haojun Liao 已提交
350
  pUidList->currentIndex = 0;
H
Haojun Liao 已提交
351

352
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
353
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
H
Haojun Liao 已提交
354

355
    pScanInfo->uid = idList[j].uid;
H
Haojun Liao 已提交
356
    pUidList->tableUidList[j] = idList[j].uid;
H
Haojun Liao 已提交
357

358
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
359
      int64_t skey = pTsdbReader->window.skey;
360
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
361
    } else {
H
Haojun Liao 已提交
362
      int64_t ekey = pTsdbReader->window.ekey;
363
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
364
    }
wmmhello's avatar
wmmhello 已提交
365

366
    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
367 368
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
369 370
  }

H
Haojun Liao 已提交
371
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
H
Haojun Liao 已提交
372

H
Haojun Liao 已提交
373 374 375 376
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
377

378
  return pTableMap;
H
Hongze Cheng 已提交
379
}
H
Hongze Cheng 已提交
380

381 382
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
383
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
384
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
385 386

    pInfo->iterInit = false;
H
Haojun Liao 已提交
387
    pInfo->iter.hasVal = false;
388
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
389

390 391
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
392 393
    }

H
Haojun Liao 已提交
394 395
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
396 397
    }

398 399
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
400 401 402
  }
}

403 404
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
405 406

  p->iter.hasVal = false;
407
  p->iiter.hasVal = false;
408

409 410 411
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
412

413 414 415
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
416

417 418 419 420
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
421

H
Haojun Liao 已提交
422
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
423
  void* p = NULL;
424
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
425
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
426 427 428 429 430
  }

  taosHashCleanup(pTableMap);
}

431
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
H
Hongze Cheng 已提交
432

433 434 435
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
436
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
437

438
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
439
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
440

dengyihao's avatar
dengyihao 已提交
441
  STimeWindow win = *pWindow;
442 443 444 445 446 447
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
448

H
Haojun Liao 已提交
449
// init file iterator
450
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
451
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
452

453 454
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
455
  pIter->pFileList = aDFileSet;
456
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
457

458 459 460 461
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
462
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
463 464
      return code;
    }
465 466
  }

467 468 469 470 471 472 473 474
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

475
  if (pLReader->pInfo == NULL) {
476
    // here we ignore the first column, which is always be the primary timestamp column
477 478 479
    SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;

    int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
X
Xiaoyu Wang 已提交
480
    pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
H
Haojun Liao 已提交
481 482 483 484
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
485 486
  }

487
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
488 489 490
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
491
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool *hasNext) {
492 493
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
494
  pIter->index += step;
D
dapan1121 已提交
495
  int32_t code = 0;
496 497

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
498 499
    *hasNext = false;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
500 501
  }

H
Haojun Liao 已提交
502 503 504
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

505 506
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
507
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
508

H
Haojun Liao 已提交
509 510
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
511

512
  while (1) {
H
Haojun Liao 已提交
513 514 515
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
516

517
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
518

D
dapan1121 已提交
519
    code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
520 521 522
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
523

524 525
    pReader->cost.headFileLoad += 1;

526 527 528 529 530 531 532
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
533 534
      *hasNext = false;
      return TSDB_CODE_SUCCESS;
535 536 537 538
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
539
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
540 541
        *hasNext = false;
        return TSDB_CODE_SUCCESS;
542
      }
543 544
      continue;
    }
C
Cary Xu 已提交
545

546
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
547
              pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
548 549
    *hasNext = true;
    return TSDB_CODE_SUCCESS;
550
  }
551

552
_err:
D
dapan1121 已提交
553 554
  *hasNext = false;
  return code;
H
Haojun Liao 已提交
555 556
}

557
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
558 559
  pIter->order = order;
  pIter->index = -1;
560
  pIter->numOfBlocks = 0;
561 562 563 564 565 566 567
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
568
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
569

H
Haojun Liao 已提交
570
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
571 572
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
573 574
}

575 576 577 578 579 580 581 582
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
583
    SColumnInfoData colInfo = {0};
584 585 586 587 588 589 590 591 592 593 594 595 596
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexInit(&pReader->readerMutex, NULL);

  qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexDestroy(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexTryLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbReleaseReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexUnlock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

652 653 654 655 656 657
void tsdbReleaseDataBlock(STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;
  if (!pStatus->composedDataBlock) {
    tsdbReleaseReader(pReader);
  }
}
658

659
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
660
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
661
  int32_t      code = 0;
662
  int8_t       level = 0;
H
Haojun Liao 已提交
663
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
664 665
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
666
    goto _end;
H
Hongze Cheng 已提交
667 668
  }

C
Cary Xu 已提交
669
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
670
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
671 672
  }

H
Haojun Liao 已提交
673
  initReaderStatus(&pReader->status);
674

L
Liu Jicong 已提交
675
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
676 677
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
678
  pReader->capacity = capacity;
H
Haojun Liao 已提交
679
  pReader->pResBlock = pResBlock;
680
  pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
dengyihao's avatar
dengyihao 已提交
681
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
682
  pReader->type = pCond->type;
683
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
684
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
685

H
Haojun Liao 已提交
686 687 688 689 690 691 692 693
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }
694

H
Haojun Liao 已提交
695 696 697 698 699
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }
H
Hongze Cheng 已提交
700

701 702
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
703
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
704
  if (pSup->pColAgg == NULL) {
705 706 707
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
708

709 710
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
711
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
712 713 714 715 716
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

717
  setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
718

719
  tsdbInitReaderLock(pReader);
720

H
Hongze Cheng 已提交
721 722
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
723

H
Haojun Liao 已提交
724 725
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
726 727 728
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
729

H
Haojun Liao 已提交
730
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
X
Xiaoyu Wang 已提交
731
  int64_t    st = taosGetTimestampUs();
732 733 734
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
735
    goto _end;
H
Haojun Liao 已提交
736
  }
H
Hongze Cheng 已提交
737

H
Haojun Liao 已提交
738 739
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

740 741
  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
742
  if (num == 0) {
743
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
744 745
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
746

H
Haojun Liao 已提交
747
  // todo binary search to the start position
748 749
  int64_t et1 = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
750
  SBlockIdx*     pBlockIdx = NULL;
751
  STableUidList* pList = &pReader->status.uidList;
H
Haojun Liao 已提交
752

H
Haojun Liao 已提交
753
  int32_t i = 0, j = 0;
X
Xiaoyu Wang 已提交
754
  while (i < num && j < numOfTables) {
H
Haojun Liao 已提交
755
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Hongze Cheng 已提交
756
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
757
      i += 1;
H
Haojun Liao 已提交
758 759 760
      continue;
    }

H
Haojun Liao 已提交
761 762
    if (pBlockIdx->uid < pList->tableUidList[j]) {
      i += 1;
H
Haojun Liao 已提交
763 764 765
      continue;
    }

H
Haojun Liao 已提交
766
    if (pBlockIdx->uid > pList->tableUidList[j]) {
H
Haojun Liao 已提交
767
      j += 1;
H
Haojun Liao 已提交
768
      continue;
H
Haojun Liao 已提交
769 770
    }

H
Haojun Liao 已提交
771
    if (pBlockIdx->uid == pList->tableUidList[j]) {
H
Haojun Liao 已提交
772 773 774
      // this block belongs to a table that is not queried.
      void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
      if (p == NULL) {
X
Xiaoyu Wang 已提交
775 776
        tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%" PRIu64 ", %s", pBlockIdx->uid,
                  pReader->idStr);
H
Haojun Liao 已提交
777 778 779 780 781 782 783 784 785
        return TSDB_CODE_APP_ERROR;
      }

      STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
      if (pScanInfo->pBlockList == NULL) {
        pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
      }

      taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
786

H
Haojun Liao 已提交
787
      i += 1;
H
Haojun Liao 已提交
788
      j += 1;
789
    }
H
Haojun Liao 已提交
790
  }
H
Hongze Cheng 已提交
791

792
  int64_t et2 = taosGetTimestampUs();
H
Haojun Liao 已提交
793 794 795
  tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0,
            pReader->idStr);
796 797 798

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

799
_end:
800
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
801 802
  return code;
}
H
Hongze Cheng 已提交
803

804
static void cleanupTableScanInfo(SHashObj* pTableMap) {
805
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
806
  while (1) {
807
    px = taosHashIterate(pTableMap, px);
808 809 810 811
    if (px == NULL) {
      break;
    }

812
    // reset the index in last block when handing a new file
813 814
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
815
  }
816 817
}

818
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
819 820 821 822 823 824
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
825

dengyihao's avatar
dengyihao 已提交
826
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
827
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
828

H
Haojun Liao 已提交
829 830
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
831

832
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
833
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
834
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
835

836
    sizeInDisk += pScanInfo->mapData.nData;
837 838 839 840 841 842 843 844 845 846 847 848 849

    int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
    STimeWindow w = pReader->window;
    if (ASCENDING_TRAVERSE(pReader->order)) {
      w.skey = pScanInfo->lastKey + step;
    } else {
      w.ekey = pScanInfo->lastKey + step;
    }

    if (isEmptyQueryTimeWindow(&w)) {
      continue;
    }

H
Haojun Liao 已提交
850
    SDataBlk block = {0};
851
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
852
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
853

854
      // 1. time range check
855 856
      // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
      if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
H
Haojun Liao 已提交
857 858
        continue;
      }
H
Hongze Cheng 已提交
859

860
      // 2. version range check
H
Hongze Cheng 已提交
861
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
862 863
        continue;
      }
864

865
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
866
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
867

H
Haojun Liao 已提交
868 869
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
870
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
871 872
        return TSDB_CODE_OUT_OF_MEMORY;
      }
873

874
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
875
    }
H
Hongze Cheng 已提交
876

H
Haojun Liao 已提交
877 878 879 880
    if (pScanInfo->pBlockList != NULL) {
      if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
        numOfQTable += 1;
      }
881 882 883
    }
  }

H
Hongze Cheng 已提交
884
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
885
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
886

887
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
888
  tsdbDebug(
889
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
890
      "time:%.2f ms %s",
891
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
892
      pReader->idStr);
893

894
  pReader->cost.numOfBlocks += total;
895
  pReader->cost.headFileLoadTime += el;
896

H
Haojun Liao 已提交
897 898
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
899

900
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
901
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
902
  pDumpInfo->allDumped = true;
903
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
904 905
}

906 907
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
908
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
909
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
910 911 912
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
913
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
914 915 916 917
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
918 919 920
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
921
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
922
  }
H
Haojun Liao 已提交
923 924
}

925
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
926 927 928
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
929 930
    return NULL;
  }
931 932 933

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
934 935
}

H
Hongze Cheng 已提交
936
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
937

C
Cary Xu 已提交
938 939 940 941 942 943
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
944
  assert(pos >= 0 && pos < num);
C
Cary Xu 已提交
945 946 947 948
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
949 950
    e = num - 1;
    if (key < keyList[pos]) return -1;
C
Cary Xu 已提交
951 952
    while (1) {
      // check can return
H
Hongze Cheng 已提交
953 954 955
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
C
Cary Xu 已提交
956 957

      // change start or end position
H
Hongze Cheng 已提交
958
      int mid = s + (e - s + 1) / 2;
C
Cary Xu 已提交
959 960
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
961
      else if (keyList[mid] < key)
C
Cary Xu 已提交
962 963 964 965
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
966
  } else {  // DESC
C
Cary Xu 已提交
967
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
968 969
    e = 0;
    if (key > keyList[pos]) return -1;
C
Cary Xu 已提交
970 971
    while (1) {
      // check can return
H
Hongze Cheng 已提交
972 973 974
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
C
Cary Xu 已提交
975 976

      // change start or end position
H
Hongze Cheng 已提交
977
      int mid = s - (s - e + 1) / 2;
C
Cary Xu 已提交
978 979
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
980
      else if (keyList[mid] > key)
C
Cary Xu 已提交
981 982 983 984 985 986 987
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
988
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
C
Cary Xu 已提交
989 990
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
991
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
992 993 994 995 996 997

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
998 999
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
C
Cary Xu 已提交
1000 1001 1002 1003 1004
  }

  return endPos;
}

H
Haojun Liao 已提交
1005
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
1025 1026
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Hongze Cheng 已提交
1027
                            int32_t dumpedRows, bool asc) {
H
Haojun Liao 已提交
1028 1029 1030 1031 1032 1033 1034 1035
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

H
Hongze Cheng 已提交
1036
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1037

H
Haojun Liao 已提交
1038
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
1039
  //  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
1040 1041 1042 1043 1044 1045

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
H
Hongze Cheng 已提交
1046
    switch (pColData->info.type) {
H
Haojun Liao 已提交
1047 1048 1049
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
1050
      case TSDB_DATA_TYPE_UBIGINT: {
H
Haojun Liao 已提交
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
H
Hongze Cheng 已提交
1064
        int32_t mid = dumpedRows >> 1u;
H
Haojun Liao 已提交
1065 1066
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1067
          int8_t t = pts[j];
H
Haojun Liao 已提交
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1092
          int32_t t = pts[j];
H
Haojun Liao 已提交
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1115
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
1116 1117 1118 1119
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1120

1121
  SBlockData*         pBlockData = &pStatus->fileBlockData;
C
Cary Xu 已提交
1122
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1123
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1124
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1125
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1126

H
Haojun Liao 已提交
1127
  SColVal cv = {0};
1128
  int64_t st = taosGetTimestampUs();
1129 1130
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1131

1132 1133
  // no data exists, return directly.
  if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
X
Xiaoyu Wang 已提交
1134 1135
    tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
             pBlockInfo->uid, pReader->idStr);
1136 1137 1138 1139
    pResBlock->info.rows = 0;
    return 0;
  }

1140 1141
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1142 1143 1144
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1145
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1146
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1147 1148 1149
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1150 1151 1152 1153 1154 1155 1156 1157 1158

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1159
    }
C
Cary Xu 已提交
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  }

  // time window check
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1170 1171 1172
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1173 1174
  }

H
Haojun Liao 已提交
1175
  int32_t i = 0;
C
Cary Xu 已提交
1176 1177
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1178 1179
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1180
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1181 1182 1183
    i += 1;
  }

1184
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1185
  int32_t num = pBlockData->nColData;
1186
  while (i < numOfOutputCols && colIndex < num) {
1187 1188
    rowIndex = 0;

H
Hongze Cheng 已提交
1189
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1190
    if (pData->cid < pSupInfo->colId[i]) {
1191
      colIndex += 1;
H
Haojun Liao 已提交
1192 1193
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1194

H
Hongze Cheng 已提交
1195
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1196
        colDataAppendNNULL(pColData, 0, dumpedRows);
C
Cary Xu 已提交
1197
      } else {
H
Haojun Liao 已提交
1198
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1199 1200
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1201
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
C
Cary Xu 已提交
1202 1203 1204 1205
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1206
      }
C
Cary Xu 已提交
1207

1208
      colIndex += 1;
1209
      i += 1;
1210
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1211
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1212
      colDataAppendNNULL(pColData, 0, dumpedRows);
1213
      i += 1;
H
Haojun Liao 已提交
1214
    }
1215 1216
  }

1217
  // fill the mis-matched columns with null value
1218
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1219
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1220
    colDataAppendNNULL(pColData, 0, dumpedRows);
1221
    i += 1;
H
Haojun Liao 已提交
1222
  }
H
Haojun Liao 已提交
1223

1224
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1225 1226
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1227

1228
  // check if current block are all handled
C
Cary Xu 已提交
1229 1230
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1231 1232 1233
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
C
Cary Xu 已提交
1234
  } else {
1235 1236
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
C
Cary Xu 已提交
1237
  }
H
Haojun Liao 已提交
1238

1239
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1240
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1241

1242
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1243
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1244
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1245
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1246
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1247 1248 1249 1250

  return TSDB_CODE_SUCCESS;
}

1251 1252
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1253
  int32_t code = 0;
1254 1255
  int64_t st = taosGetTimestampUs();

1256
  tBlockDataReset(pBlockData);
1257 1258
  STSchema* pSchema = getLatestTableSchema(pReader, uid);
  if (pSchema == NULL) {
X
Xiaoyu Wang 已提交
1259
    tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
1260 1261 1262 1263
    return code;
  }

  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
X
Xiaoyu Wang 已提交
1264
  TABLEID             tid = {.suid = pReader->suid, .uid = uid};
1265
  code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
1266 1267 1268 1269
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1270
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1271
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1272

H
Hongze Cheng 已提交
1273
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1274
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1275 1276 1277
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
1278
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1279 1280 1281
              tstrerror(code), pReader->idStr);
    return code;
  }
1282

1283
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1284

1285 1286 1287 1288
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1289 1290 1291

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1292

H
Haojun Liao 已提交
1293
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1294
}
H
Hongze Cheng 已提交
1295

H
Haojun Liao 已提交
1296 1297 1298
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1299

H
Haojun Liao 已提交
1300 1301 1302 1303
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1304

H
Haojun Liao 已提交
1305 1306
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1307

H
Haojun Liao 已提交
1308 1309
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1310 1311
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1312

H
Haojun Liao 已提交
1313 1314 1315 1316
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1317

H
Haojun Liao 已提交
1318 1319
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1320

H
Haojun Liao 已提交
1321
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1322
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1323
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1324

H
Haojun Liao 已提交
1325
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1326

H
Haojun Liao 已提交
1327 1328
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1329

H
Haojun Liao 已提交
1330 1331 1332 1333 1334 1335 1336
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1337

1338
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1339
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1340

1341 1342 1343
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1344
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1345 1346
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1347
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1348
    if (pScanInfo == NULL) {
1349
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1350 1351 1352
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1353 1354
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1355
  }
1356 1357 1358 1359 1360 1361

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1362
}
H
Hongze Cheng 已提交
1363

1364
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1365
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1366

1367
  SBlockOrderSupporter sup = {0};
1368
  pBlockIter->numOfBlocks = numOfBlocks;
1369
  taosArrayClear(pBlockIter->blockList);
1370
  pBlockIter->pTableMap = pReader->status.pTableMap;
1371

1372 1373
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1374

1375
  int64_t st = taosGetTimestampUs();
1376
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1377 1378 1379
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1380

1381 1382 1383 1384 1385 1386 1387
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1388

1389
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1390 1391 1392
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1393

1394 1395
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1396

1397 1398 1399
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1400
      return TSDB_CODE_OUT_OF_MEMORY;
1401
    }
H
Haojun Liao 已提交
1402

1403
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1404

1405 1406 1407
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1408
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1409 1410 1411 1412 1413
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1414

H
Haojun Liao 已提交
1415 1416 1417 1418
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1419

1420
  // since there is only one table qualified, blocks are not sorted
1421 1422
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1423 1424
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1425
    }
1426

1427
    int64_t et = taosGetTimestampUs();
1428
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1429
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1430

1431
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1432
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1433
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1434
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1435
  }
H
Haojun Liao 已提交
1436

1437 1438
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1439

1440
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1441 1442

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1443 1444
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1445
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1446
  }
H
Haojun Liao 已提交
1447

1448 1449 1450 1451
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1452

1453 1454
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1455

1456 1457 1458 1459
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1460

1461 1462
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1463
  }
H
Haojun Liao 已提交
1464

1465
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1466 1467
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1468 1469
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1470

1471
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1472
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1473

1474
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1475
}
H
Hongze Cheng 已提交
1476

H
Haojun Liao 已提交
1477
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1478 1479
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1480
  int32_t step = asc ? 1 : -1;
1481
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1482 1483 1484
    return false;
  }

1485
  pBlockIter->index += step;
H
Haojun Liao 已提交
1486
  doSetCurrentBlock(pBlockIter, idStr);
1487

1488 1489 1490
  return true;
}

1491 1492 1493
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1494
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1495 1496
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1497 1498
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1499
}
H
Hongze Cheng 已提交
1500

1501
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1502
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1503
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1504
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1505
    return false;
1506 1507
  }

H
Haojun Liao 已提交
1508
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1509
    return false;
1510 1511
  }

1512
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1513
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1514 1515
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1516
  return true;
1517 1518 1519
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1520
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1521 1522
  int32_t index = pBlockIter->index;

1523
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1535
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1536
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1537 1538 1539 1540
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1541 1542 1543 1544 1545
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1546

1547 1548 1549
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1550

H
Haojun Liao 已提交
1551
  doSetCurrentBlock(pBlockIter, "");
1552 1553 1554
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1555
// todo: this attribute could be acquired during extractin the global ordered block list.
1556
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1557 1558
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1559
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1560
  } else {
1561
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1562
  }
H
Haojun Liao 已提交
1563
}
H
Hongze Cheng 已提交
1564

H
Hongze Cheng 已提交
1565
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1566
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1567

1568
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1569
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1570
}
H
Hongze Cheng 已提交
1571

H
Hongze Cheng 已提交
1572
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1573 1574
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1575 1576
}

H
Hongze Cheng 已提交
1577 1578
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1579 1580
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1581
  for (int32_t i = startIndex; i < num; i += 1) {
1582 1583
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1584
      if (p->version >= pBlock->minVer) {
1585 1586 1587
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1588
      if (p->version >= pBlock->minVer) {
1589 1590
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1591 1592
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1606
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1607 1608 1609 1610
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1611
  // ts is not overlap
1612
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1613
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1614 1615 1616 1617 1618
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1619
  if (ASCENDING_TRAVERSE(order)) {
1620
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1621 1622
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1623
    while (1) {
1624 1625 1626 1627 1628
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1629 1630 1631
      }
    }

1632
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1633
  }
1634 1635
}

C
Cary Xu 已提交
1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1649 1650
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1651

1652
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1653

1654
  // overlap with neighbor
1655
  if (hasNeighbor) {
1656
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1657 1658
  }

1659
  // has duplicated ts of different version in this block
C
Cary Xu 已提交
1660 1661
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1662

1663 1664 1665
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1666 1667
  }

C
Cary Xu 已提交
1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}

// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
1683

C
Cary Xu 已提交
1684 1685 1686
  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1687 1688 1689 1690

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
X
Xiaoyu Wang 已提交
1691
              " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
1692
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
C
Cary Xu 已提交
1693 1694 1695
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1696 1697 1698
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1699 1700
}

C
Cary Xu 已提交
1701 1702 1703 1704 1705 1706 1707 1708 1709
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1710
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1711
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1712 1713
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1714

1715 1716 1717
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1718
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1719

H
Haojun Liao 已提交
1720
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1721
  pBlock->info.id.uid = pBlockScanInfo->uid;
1722

1723
  setComposedBlockFlag(pReader, true);
1724

1725
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1726
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
X
Xiaoyu Wang 已提交
1727
            " - %" PRId64 ", uid:%" PRIu64 ",  %s",
1728
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
1729
            pBlockScanInfo->uid, pReader->idStr);
1730 1731

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1732 1733 1734
  return code;
}

1735 1736
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1737 1738 1739
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1740 1741
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1742
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1743 1744

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1745
    if (nextKey != key) {  // merge is not needed
1746
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1747 1748 1749 1750 1751 1752 1753 1754
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1755
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
1756
                                  SVersionRange* pVerRange) {
X
Xiaoyu Wang 已提交
1757
  int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
H
Haojun Liao 已提交
1758

1759 1760 1761 1762 1763 1764 1765 1766
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1767 1768 1769
    if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
      pScanInfo->lastKey = k.ts;
    } else {
H
Haojun Liao 已提交
1770 1771 1772 1773 1774 1775
      // the qualifed ts may equal to k.ts, only a greater version one.
      // here we need to fallback one step.
      if (pScanInfo->lastKey == k.ts) {
        pScanInfo->lastKey -= step;
      }

1776 1777 1778 1779 1780 1781 1782
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1783
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

1798 1799 1800 1801 1802 1803
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
  if (pReader->pSchema != NULL) {
    return pReader->pSchema;
  }

  pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
X
Xiaoyu Wang 已提交
1804
  if (pReader->pSchema == NULL) {
1805 1806 1807
    tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
  }

X
Xiaoyu Wang 已提交
1808
  return pReader->pSchema;
1809 1810
}

H
Haojun Liao 已提交
1811 1812 1813
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1814
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1815 1816
  }

1817
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1818 1819 1820 1821 1822 1823
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1824 1825 1826 1827 1828 1829
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1830 1831 1832 1833 1834 1835
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1836
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1837
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1838
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1839 1840
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1841 1842
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1843
  }
H
Haojun Liao 已提交
1844 1845
}

1846
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1847 1848
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
1849
  SRow*               pTSRow = NULL;
1850 1851 1852
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1853
  int64_t tsLast = INT64_MIN;
1854
  if (hasDataInLastBlock(pLastBlockReader)) {
1855 1856
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1857

H
Hongze Cheng 已提交
1858 1859
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1860

1861 1862
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1863
    minKey = INT64_MAX;  // chosen the minimum value
1864
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1865 1866
      minKey = tsLast;
    }
1867

1868 1869 1870
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1871

1872
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1873 1874 1875 1876
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1877
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1878 1879 1880 1881 1882 1883 1884
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1885
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1886 1887
      minKey = key;
    }
1888 1889 1890 1891
  }

  bool init = false;

1892
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1893
  // DESC: mem -----> imem -----> last block -----> file block
1894 1895
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1896
      init = true;
H
Hongze Cheng 已提交
1897
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1898 1899 1900
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1901
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1902 1903
    }

1904
    if (minKey == tsLast) {
1905
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1906
      if (init) {
H
Hongze Cheng 已提交
1907
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1908
      } else {
1909
        init = true;
H
Hongze Cheng 已提交
1910
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1911 1912 1913
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1914
      }
1915
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1916
    }
1917

1918
    if (minKey == k.ts) {
K
kailixu 已提交
1919 1920 1921 1922
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1923
      if (init) {
X
Xiaoyu Wang 已提交
1924
        tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1925
      } else {
1926
        init = true;
X
Xiaoyu Wang 已提交
1927
        int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1928 1929 1930 1931 1932 1933 1934
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1935 1936 1937 1938 1939
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1940
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
1941
      int32_t   code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1942 1943 1944 1945 1946
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1947
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1948 1949
        return code;
      }
1950 1951
    }

1952
    if (minKey == tsLast) {
1953
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1954
      if (init) {
H
Hongze Cheng 已提交
1955
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1956
      } else {
1957
        init = true;
H
Hongze Cheng 已提交
1958
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1959 1960 1961
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1962
      }
1963
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1964 1965 1966
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1967
      if (init) {
H
Hongze Cheng 已提交
1968
        tsdbRowMerge(&merge, &fRow);
H
Haojun Liao 已提交
1969
      } else {
1970
        init = true;
H
Hongze Cheng 已提交
1971
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1972 1973 1974
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1975 1976 1977
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1978 1979
  }

H
Hongze Cheng 已提交
1980
  int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
1981 1982 1983 1984
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1985
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1986 1987

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
1988
  tsdbRowMergerClear(&merge);
1989 1990 1991
  return TSDB_CODE_SUCCESS;
}

1992 1993 1994
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1995
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1996
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1997

H
Hongze Cheng 已提交
1998
  SRow*      pTSRow = NULL;
1999
  SRowMerger merge = {0};
2000
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2001
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
2002

2003 2004 2005
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
2006
      pBlockScanInfo->lastKey = tsLastBlock;
2007 2008
      return TSDB_CODE_SUCCESS;
    } else {
H
Hongze Cheng 已提交
2009
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2010 2011 2012
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2013

2014
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2015
      tsdbRowMerge(&merge, &fRow1);
2016
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2017

H
Hongze Cheng 已提交
2018
      code = tsdbRowMergerGetRow(&merge, &pTSRow);
2019 2020 2021
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2022

2023
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2024 2025

      taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2026
      tsdbRowMergerClear(&merge);
2027 2028
    }
  } else {  // not merge block data
H
Hongze Cheng 已提交
2029
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2030 2031 2032 2033
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2034
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
2035 2036

    // merge with block data if ts == key
H
Haojun Liao 已提交
2037
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
2038 2039 2040
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Hongze Cheng 已提交
2041
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2042 2043 2044 2045
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2046
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2047 2048

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2049
    tsdbRowMergerClear(&merge);
2050
  }
2051 2052 2053 2054

  return TSDB_CODE_SUCCESS;
}

2055 2056
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
2057 2058
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2059
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
2060
    // no last block available, only data block exists
2061
    if (!hasDataInLastBlock(pLastBlockReader)) {
2062 2063 2064 2065 2066 2067 2068 2069 2070
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
2071
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
2072 2073
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
H
Hongze Cheng 已提交
2074
        SRow*      pTSRow = NULL;
2075
        SRowMerger merge = {0};
2076

H
Hongze Cheng 已提交
2077
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2078 2079 2080 2081
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2082
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2083 2084

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2085
        tsdbRowMerge(&merge, &fRow1);
2086

2087
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
2088

H
Hongze Cheng 已提交
2089
        code = tsdbRowMergerGetRow(&merge, &pTSRow);
2090 2091 2092 2093
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2094
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2095

2096
        taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2097
        tsdbRowMergerClear(&merge);
2098
        return code;
2099
      } else {
2100
        return TSDB_CODE_SUCCESS;
2101
      }
2102
    } else {  // desc order
2103
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
2104
    }
2105
  } else {  // only last block exists
2106
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2107
  }
2108 2109
}

2110 2111
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2112
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
2113
  SRow*               pTSRow = NULL;
H
Haojun Liao 已提交
2114
  int32_t             code = TSDB_CODE_SUCCESS;
2115 2116 2117
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2118 2119
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2120

2121
  int64_t tsLast = INT64_MIN;
2122 2123 2124
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2125

H
Hongze Cheng 已提交
2126
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2127 2128 2129 2130

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2131
  int64_t minKey = 0;
2132 2133 2134 2135 2136
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2137

2138 2139 2140
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2141

2142
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2143 2144
      minKey = key;
    }
2145

2146
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2147 2148 2149
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2150
    minKey = INT64_MIN;  // let find the maximum ts value
2151 2152 2153 2154 2155 2156 2157 2158
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2159
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2160 2161 2162
      minKey = key;
    }

2163
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2164 2165
      minKey = tsLast;
    }
2166 2167 2168 2169
  }

  bool init = false;

2170 2171 2172 2173
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2174
      init = true;
2175
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Hongze Cheng 已提交
2176
      code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2177 2178 2179 2180
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2181
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2182 2183
    }

2184
    if (minKey == tsLast) {
2185
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2186
      if (init) {
H
Hongze Cheng 已提交
2187
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2188
      } else {
2189
        init = true;
H
Hongze Cheng 已提交
2190
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2191 2192 2193
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2194
      }
H
Haojun Liao 已提交
2195

2196
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2197 2198 2199
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2200
      if (init) {
H
Hongze Cheng 已提交
2201
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2202
      } else {
2203 2204
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2205 2206 2207 2208
        if (pSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2209
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2210 2211 2212
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2213
      }
H
Haojun Liao 已提交
2214

2215 2216
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2217 2218
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2219
      }
2220 2221
    }

2222
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2223
      if (init) {
2224 2225 2226 2227
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2228
        tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
2229
      } else {
2230
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2231
        code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2232 2233 2234
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2235
      }
H
Haojun Liao 已提交
2236 2237 2238 2239
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2240 2241 2242 2243 2244
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2245
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2246
      code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2247 2248 2249 2250
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2251 2252 2253 2254 2255
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2256 2257 2258
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2259
      if (init) {
H
Hongze Cheng 已提交
2260
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2261
      } else {
2262 2263
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2264
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2265 2266 2267
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2268
      }
H
Haojun Liao 已提交
2269 2270 2271 2272
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2273 2274 2275 2276
      }
    }

    if (minKey == tsLast) {
2277
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2278
      if (init) {
H
Hongze Cheng 已提交
2279
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2280
      } else {
2281
        init = true;
H
Hongze Cheng 已提交
2282
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2283 2284 2285
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2286
      }
2287
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2288 2289 2290
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2291
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2292
      if (!init) {
H
Hongze Cheng 已提交
2293
        code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2294 2295 2296
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2297
      } else {
2298 2299 2300
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Hongze Cheng 已提交
2301
        tsdbRowMerge(&merge, &fRow);
2302 2303
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2304 2305 2306
    }
  }

2307
  if (merge.pTSchema == NULL) {
2308 2309 2310
    return code;
  }

H
Hongze Cheng 已提交
2311
  code = tsdbRowMergerGetRow(&merge, &pTSRow);
2312 2313 2314 2315
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2316
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2317 2318

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2319
  tsdbRowMergerClear(&merge);
2320
  return code;
2321 2322
}

2323 2324 2325 2326 2327 2328 2329 2330 2331
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
2332 2333
    // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
2334
  } else {
2335 2336
    // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
2337 2338 2339
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
D
dapan1121 已提交
2340
  int64_t st = 0;
2341 2342 2343 2344 2345 2346 2347 2348 2349

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2350
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2351
                  "-%" PRId64 " %s",
2352 2353
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2354
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2355 2356 2357 2358 2359
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2360
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2371
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2372
                  "-%" PRId64 " %s",
2373 2374
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2375
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2376 2377 2378 2379 2380
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2381
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2382 2383
  }

2384
  st = taosGetTimestampUs();
2385
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
2386
  pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
2387 2388 2389 2390 2391

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2392 2393
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2394 2395 2396 2397 2398 2399 2400 2401
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2413
  TSDBKEY k = {.ts = ts, .version = ver};
2414 2415
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2416 2417 2418
    return false;
  }

2419 2420 2421
  return true;
}

2422
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2423
  // the last block reader has been initialized for this table.
2424
  if (pLBlockReader->uid == pScanInfo->uid) {
2425
    return hasDataInLastBlock(pLBlockReader);
2426 2427
  }

2428 2429
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2430 2431
  }

2432 2433
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2434

H
Hongze Cheng 已提交
2435
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2436 2437 2438
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2439
  } else {
2440
    w.ekey = pScanInfo->lastKey + step;
2441 2442
  }

X
Xiaoyu Wang 已提交
2443 2444
  tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
            pScanInfo->uid, pReader->idStr);
2445 2446 2447
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2448 2449 2450 2451
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2452
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2453 2454
}

2455
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2456
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2457
  return TSDBROW_TS(&row);
2458 2459
}

H
Hongze Cheng 已提交
2460
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2461

2462
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2463
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
2464
    return false;  // this is an invalid result.
2465
  }
2466
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2467
}
2468

2469 2470
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2471 2472
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2473
    pBlockScanInfo->lastKey = key;
2474 2475
    return TSDB_CODE_SUCCESS;
  } else {
C
Cary Xu 已提交
2476 2477
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

H
Hongze Cheng 已提交
2478
    SRow*      pTSRow = NULL;
2479 2480
    SRowMerger merge = {0};

H
Hongze Cheng 已提交
2481
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2482 2483 2484 2485
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2486
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Hongze Cheng 已提交
2487
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2488 2489 2490 2491
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2492
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2493 2494

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2495
    tsdbRowMergerClear(&merge);
2496 2497 2498 2499
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2500 2501
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2502 2503
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2504
  TSDBROW *pRow = NULL, *piRow = NULL;
2505
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2506 2507 2508
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2509

2510 2511 2512
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2513

2514 2515 2516 2517
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2518

2519 2520 2521 2522
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2523

2524 2525 2526
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2527
  }
2528 2529 2530

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2531 2532
}

H
Haojun Liao 已提交
2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2586
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2587 2588
  int32_t code = TSDB_CODE_SUCCESS;

2589 2590
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2591
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
C
Cary Xu 已提交
2592 2593
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2594
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
2595
  int64_t st = taosGetTimestampUs();
2596
  int32_t step = asc ? 1 : -1;
2597
  double  el = 0;
2598 2599 2600

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2601 2602
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2603
      code = TSDB_CODE_INVALID_PARA;
2604 2605
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2606 2607 2608
      goto _end;
    }

H
Hongze Cheng 已提交
2609
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2610

C
Cary Xu 已提交
2611
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2612
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
C
Cary Xu 已提交
2613 2614

    // it is a clean block, load it directly
H
Hongze Cheng 已提交
2615
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
2616
        pBlock->nRow <= pReader->capacity) {
2617
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
2618
        copyBlockDataToSDataBlock(pReader);
2619 2620

        // record the last key value
H
Hongze Cheng 已提交
2621
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2622 2623
        goto _end;
      }
C
Cary Xu 已提交
2624 2625
    }
  } else {  // file blocks not exist
2626
    pBlockScanInfo = *pReader->status.pTableIter;
2627 2628
  }

2629
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2630
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2631

2632
  while (1) {
2633
    bool hasBlockData = false;
2634
    {
2635 2636
      while (pBlockData->nRow > 0 &&
             pBlockData->uid == pBlockScanInfo->uid) {  // find the first qualified row in data block
2637 2638 2639 2640 2641
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2642 2643
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2644
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2645
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2646
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2647

H
Haojun Liao 已提交
2648 2649 2650 2651 2652
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2653 2654
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2655
          }
2656 2657
        }
      }
2658
    }
2659

2660
    // no data in last block and block, no need to proceed.
2661
    if (hasBlockData == false) {
2662
      break;
2663 2664
    }

2665
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2666

2667
    // currently loaded file data block is consumed
2668
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2669
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2670
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2671 2672 2673 2674 2675
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2676 2677 2678
    }
  }

H
Hongze Cheng 已提交
2679
_end:
2680 2681
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2682

2683 2684 2685
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2686
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2687
              pResBlock->info.rows, el, pReader->idStr);
2688
  }
2689

H
Haojun Liao 已提交
2690
  return code;
2691 2692 2693 2694
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2695 2696 2697 2698 2699 2700 2701 2702
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2703 2704
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2705 2706 2707
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2708

2709
  int32_t code = 0;
2710 2711
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2712
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2713
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2714
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2715
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2716

H
Haojun Liao 已提交
2717
    if (pIdx != NULL) {
2718
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2719 2720 2721
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2722
    }
2723
  }
2724

2725 2726 2727 2728 2729 2730 2731
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2732 2733
  }

2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2748 2749 2750 2751 2752 2753 2754
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2755 2756
  return code;

2757 2758 2759
_err:
  taosArrayDestroy(pDelData);
  return code;
2760 2761
}

C
Cary Xu 已提交
2762
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2763
  bool asc = ASCENDING_TRAVERSE(pReader->order);
X
Xiaoyu Wang 已提交
2764
  //  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
2765

X
Xiaoyu Wang 已提交
2766
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
2767

X
Xiaoyu Wang 已提交
2768
  bool     hasKey = false, hasIKey = false;
2769
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2770
  if (pRow != NULL) {
2771
    hasKey = true;
2772 2773 2774
    key = TSDBROW_KEY(pRow);
  }

2775 2776 2777 2778
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
2779 2780
  }

2781
  if (hasKey) {
X
Xiaoyu Wang 已提交
2782
    if (hasIKey) {  // has data in mem & imem
2783 2784
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
X
Xiaoyu Wang 已提交
2785 2786
      } else {
        return key.ts <= ikey.ts ? ikey : key;
2787 2788 2789
      }
    } else {  // no data in imem
      return key;
2790
    }
2791 2792 2793 2794
  } else {
    // no data in mem & imem, return the initial value
    // only imem has data, return ikey
    return ikey;
2795 2796 2797
  }
}

2798
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2799
  SReaderStatus* pStatus = &pReader->status;
2800
  pBlockNum->numOfBlocks = 0;
2801
  pBlockNum->numOfLastFiles = 0;
2802

2803
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2804
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2805 2806

  while (1) {
D
dapan1121 已提交
2807 2808 2809 2810 2811 2812 2813
    bool hasNext = false;
    int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
    if (code) {
      taosArrayDestroy(pIndexList);
      return code;
    }
    
2814
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2815 2816 2817
      break;
    }

H
Haojun Liao 已提交
2818
    taosArrayClear(pIndexList);
D
dapan1121 已提交
2819
    code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
H
Haojun Liao 已提交
2820
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2821
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2822 2823 2824
      return code;
    }

H
Hongze Cheng 已提交
2825
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2826
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2827
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2828
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2829 2830 2831
        return code;
      }

2832
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2833 2834 2835
        break;
      }
    }
2836

H
Haojun Liao 已提交
2837 2838 2839
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2840
  taosArrayDestroy(pIndexList);
2841

H
Haojun Liao 已提交
2842 2843 2844 2845 2846 2847 2848
  if (pReader->pReadSnap != NULL) {
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2849

H
Haojun Liao 已提交
2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2861 2862 2863
    }
  }

H
Haojun Liao 已提交
2864 2865 2866
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2867
static void resetTableListIndex(SReaderStatus* pStatus) {
2868
  STableUidList* pList = &pStatus->uidList;
2869

H
Haojun Liao 已提交
2870 2871 2872
  pList->currentIndex = 0;
  uint64_t uid = pList->tableUidList[0];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2873 2874
}

2875
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
2876 2877 2878 2879 2880 2881 2882 2883
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2884
  return (pStatus->pTableIter != NULL);
2885 2886
}

2887
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2888
  SReaderStatus*    pStatus = &pReader->status;
2889
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
2890
  STableUidList*    pUidList = &pStatus->uidList;
2891

H
Haojun Liao 已提交
2892 2893
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
    return TSDB_CODE_SUCCESS;
2894
  }
2895

2896 2897
  SSDataBlock* pResBlock = pReader->pResBlock;

2898
  while (1) {
2899
    // load the last data block of current table
H
Hongze Cheng 已提交
2900
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2901 2902

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2903
    if (!hasVal) {
2904
      bool hasNexTable = moveToNextTable(pUidList, pStatus);
2905
      if (!hasNexTable) {
2906 2907
        return TSDB_CODE_SUCCESS;
      }
2908

2909
      continue;
2910 2911
    }

2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2925 2926
    }

2927 2928 2929 2930 2931 2932 2933 2934
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2935 2936
      return TSDB_CODE_SUCCESS;
    }
2937

2938
    // current table is exhausted, let's try next table
2939
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
2940
    if (!hasNexTable) {
2941 2942
      return TSDB_CODE_SUCCESS;
    }
2943 2944 2945
  }
}

2946
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2947 2948
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2949 2950 2951

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2952 2953 2954
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2955

H
Haojun Liao 已提交
2956 2957
  ASSERT(pBlockInfo != NULL);

2958
  pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2959
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2960
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2961 2962 2963 2964
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2965
  pBlock = getCurrentBlock(pBlockIter);
2966

2967
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
C
Cary Xu 已提交
2968
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2969

2970
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2971
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2972 2973
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2974 2975 2976
    }

    // build composed data block
2977
    code = buildComposedDataBlock(pReader);
C
Cary Xu 已提交
2978
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2979
    // data in memory that are earlier than current file block
2980
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2981
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2982
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2983 2984 2985 2986
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2987
      ASSERT(tsLast >= pBlock->maxKey.ts);
2988

2989 2990 2991 2992
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
2993
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
X
Xiaoyu Wang 已提交
3016
                  " rows:%d, elapsed time:%.2f ms %s",
3017 3018 3019
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
3020
    } else {  // whole block is required, return it directly
3021 3022
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
3023
      pInfo->id.uid = pScanInfo->uid;
3024
      pInfo->dataLoad = 0;
3025 3026 3027
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
3028

3029
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
3030
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
X
Xiaoyu Wang 已提交
3031 3032
      tsdbDebug("%p uid:%" PRIu64
                " clean file block retrieved from file, global index:%d, "
H
Haojun Liao 已提交
3033 3034 3035
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
3036
    }
3037 3038 3039 3040 3041
  }

  return code;
}

H
Haojun Liao 已提交
3042
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
3043
  SReaderStatus* pStatus = &pReader->status;
3044
  STableUidList* pUidList = &pStatus->uidList;
3045

3046
  while (1) {
X
Xiaoyu Wang 已提交
3047 3048 3049 3050 3051 3052
    //    if (pStatus->pTableIter == NULL) {
    //      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
    //      if (pStatus->pTableIter == NULL) {
    //        return TSDB_CODE_SUCCESS;
    //      }
    //    }
3053

3054 3055
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
3056

3057
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
3058
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
3059 3060 3061 3062
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3063
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
3064
      return TSDB_CODE_SUCCESS;
3065 3066
    }

3067 3068 3069
    // current table is exhausted, let's try next table
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
    if (!hasNexTable) {
H
Haojun Liao 已提交
3070
      return TSDB_CODE_SUCCESS;
3071 3072 3073 3074
    }
  }
}

3075
// set the correct start position in case of the first/last file block, according to the query time window
3076
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3077 3078 3079 3080 3081 3082 3083 3084
  int64_t             lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (pScanInfo) {
      lastKey = pScanInfo->lastKey;
    }
3085
  }
3086 3087 3088
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
3089 3090 3091

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3092
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3093
  pDumpInfo->lastKey = lastKey;
3094 3095
}

3096
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3097
  SBlockNumber num = {0};
X
Xiaoyu Wang 已提交
3098
  int32_t      code = moveToNextFile(pReader, &num);
3099 3100 3101 3102 3103
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3104
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3105 3106 3107 3108 3109
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3110 3111
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3112
  } else {  // no block data, only last block exists
3113
    tBlockDataReset(&pReader->status.fileBlockData);
3114
    resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3115
    resetTableListIndex(&pReader->status);
3116
  }
3117 3118

  // set the correct start position according to the query time window
3119
  initBlockDumpInfo(pReader, pBlockIter);
3120 3121 3122
  return code;
}

3123
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3124 3125
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3126 3127
}

3128
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3129
  int32_t code = TSDB_CODE_SUCCESS;
3130 3131
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3132 3133
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3134
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
3135
  _begin:
3136 3137 3138 3139 3140
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3141 3142 3143 3144
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3145
    // all data blocks are checked in this last block file, now let's try the next file
3146 3147 3148 3149 3150 3151 3152 3153
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3154
      // this file does not have data files, let's start check the last block file if exists
3155
      if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3156
        resetTableListIndex(&pReader->status);
3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3171
  while (1) {
3172 3173
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3174
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3175
      code = buildComposedDataBlock(pReader);
3176 3177 3178 3179
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3180
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3181 3182
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3183
        } else {
H
Haojun Liao 已提交
3184
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3185
            // data blocks in current file are exhausted, let's try the next file now
H
Haojun Liao 已提交
3186 3187 3188 3189 3190 3191
            SBlockData* pBlockData = &pReader->status.fileBlockData;
            if (pBlockData->uid != 0) {
              tBlockDataClear(pBlockData);
            }

            tBlockDataReset(pBlockData);
3192
            resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3193
            resetTableListIndex(&pReader->status);
3194 3195 3196
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3197

3198 3199 3200 3201
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3202

3203 3204
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3205
              resetTableListIndex(&pReader->status);
3206 3207
              goto _begin;
            }
3208
          }
3209
        }
H
Haojun Liao 已提交
3210
      }
3211 3212

      code = doBuildDataBlock(pReader);
3213 3214
    }

3215 3216 3217 3218 3219 3220 3221 3222
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3223
}
H
refact  
Hongze Cheng 已提交
3224

3225 3226
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3227
  if (VND_IS_RSMA(pVnode)) {
3228
    int8_t  level = 0;
3229 3230
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3231 3232 3233
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3234

3235
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3236 3237 3238 3239 3240 3241 3242
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3243
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3244 3245 3246 3247 3248
        break;
      }
      ++level;
    }

3249
    const char* str = (idStr != NULL) ? idStr : "";
3250 3251

    if (level == TSDB_RETENTION_L0) {
3252
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3253
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3254 3255
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3256
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3257
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3258 3259
      return VND_RSMA1(pVnode);
    } else {
3260
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3261
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3262 3263 3264 3265 3266 3267 3268
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3269
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3270
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3271 3272

  int64_t endVer = 0;
L
Liu Jicong 已提交
3273 3274
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3275 3276
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3277
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3278 3279
  }

H
Haojun Liao 已提交
3280
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3281 3282
}

3283
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3284 3285 3286
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3287

L
Liu Jicong 已提交
3288 3289 3290
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3291

3292 3293 3294 3295 3296 3297
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3298
        return false;
3299 3300
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
3301 3302
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
                prev->version >= pVerRange->minVer);
3303 3304
      }
    } else {
3305 3306 3307 3308 3309 3310 3311
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3312 3313
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3329 3330
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3331 3332 3333 3334 3335 3336
            return true;
          }
        }
      }

      return false;
3337 3338
    }
  } else {
3339 3340
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3341

3342 3343 3344 3345 3346 3347 3348
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3349
    } else {
3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3377 3378 3379 3380 3381
      }

      return false;
    }
  }
3382 3383

  return false;
3384 3385
}

3386
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3387
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3388 3389
    return NULL;
  }
H
Hongze Cheng 已提交
3390

3391
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3392 3393
  TSDBKEY  key = TSDBROW_KEY(pRow);

3394
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3395
    pIter->hasVal = false;
H
Haojun Liao 已提交
3396 3397
    return NULL;
  }
H
Hongze Cheng 已提交
3398

3399
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3400
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3401
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3402 3403
    return pRow;
  }
H
Hongze Cheng 已提交
3404

3405
  while (1) {
3406 3407
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3408 3409
      return NULL;
    }
H
Hongze Cheng 已提交
3410

3411
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3412

H
Haojun Liao 已提交
3413
    key = TSDBROW_KEY(pRow);
3414
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3415
      pIter->hasVal = false;
H
Haojun Liao 已提交
3416 3417
      return NULL;
    }
H
Hongze Cheng 已提交
3418

dengyihao's avatar
dengyihao 已提交
3419
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3420
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3421 3422 3423 3424
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3425

3426 3427
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3428
  while (1) {
3429 3430
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3431 3432
      break;
    }
H
Hongze Cheng 已提交
3433

3434
    // data exists but not valid
3435
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3436 3437 3438 3439 3440
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3441
    TSDBKEY k = TSDBROW_KEY(pRow);
3442
    if (k.ts != ts) {
H
Haojun Liao 已提交
3443 3444 3445
      break;
    }

3446 3447 3448 3449 3450
    if (pRow->type == TSDBROW_ROW_FMT) {
      STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
      if (pTSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
3451

3452 3453 3454 3455
      tsdbRowMergerAdd(pMerger, pRow, pTSchema);
    } else {  // column format
      tsdbRowMerge(pMerger, pRow);
    }
H
Haojun Liao 已提交
3456 3457 3458 3459 3460
  }

  return TSDB_CODE_SUCCESS;
}

3461
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3462
                                          SVersionRange* pVerRange, int32_t step) {
3463
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3464
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3465
      rowIndex += step;
3466 3467 3468 3469
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
H
Hongze Cheng 已提交
3470
    tsdbRowMerge(pMerger, &fRow);
3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3482
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3483 3484
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3485
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3486
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3487

3488
  *state = CHECK_FILEBLOCK_QUIT;
3489
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3490

3491
  bool    loadNeighbor = true;
H
Haojun Liao 已提交
3492
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3493

H
Haojun Liao 已提交
3494
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3495 3496
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3497
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3498 3499 3500 3501
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3502
  return code;
3503 3504
}

3505 3506
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3507 3508
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3509
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3510
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3511
  int32_t step = asc ? 1 : -1;
3512

3513
  pDumpInfo->rowIndex += step;
3514
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3515 3516 3517
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3518

3519 3520 3521 3522
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3523

3524
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3525
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
H
Haojun Liao 已提交
3526 3527 3528 3529 3530
      if (pFileBlockInfo == NULL) {
        st = CHECK_FILEBLOCK_QUIT;
        break;
      }

3531 3532 3533
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3534
      }
3535
    }
H
Haojun Liao 已提交
3536
  }
3537

H
Haojun Liao 已提交
3538 3539 3540
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3541
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3542 3543
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3544 3545
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3546
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
3547
      tsdbRowMerge(pMerger, &fRow1);
3548 3549 3550 3551 3552 3553 3554 3555
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3556
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
3557
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3558
  TSDBROW* pNextRow = NULL;
3559
  TSDBROW  current = *pRow;
3560

3561 3562
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3563

3564
    if (!pIter->hasVal) {
3565
      *pResRow = *pRow;
3566
      *freeTSRow = false;
3567
      return TSDB_CODE_SUCCESS;
3568
    } else {  // has next point in mem/imem
3569
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3570
      if (pNextRow == NULL) {
H
Haojun Liao 已提交
3571
        *pResRow = current;
3572
        *freeTSRow = false;
3573
        return TSDB_CODE_SUCCESS;
3574 3575
      }

H
Hongze Cheng 已提交
3576
      if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
H
Haojun Liao 已提交
3577
        *pResRow = current;
3578
        *freeTSRow = false;
3579
        return TSDB_CODE_SUCCESS;
3580
      }
3581
    }
3582 3583
  }

3584
  SRowMerger merge = {0};
H
Haojun Liao 已提交
3585
  terrno = 0;
3586
  int32_t code = 0;
H
Haojun Liao 已提交
3587

3588 3589 3590 3591 3592 3593 3594
  // start to merge duplicated rows
  if (current.type == TSDBROW_ROW_FMT) {
    // get the correct schema for data in memory
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
    if (pTSchema == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3595

3596 3597 3598
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
H
Haojun Liao 已提交
3599

3600
    code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
3601 3602 3603
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3604

3605 3606 3607 3608
    STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
    if (pTSchema1 == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3609

3610
    tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
3611 3612
  } else {  // let's merge rows in file block
    code = tsdbRowMergerInit(&merge, &current, pReader->pSchema);
3613 3614 3615
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3616

3617 3618
    tsdbRowMerge(&merge, pNextRow);
  }
H
Haojun Liao 已提交
3619

wmmhello's avatar
wmmhello 已提交
3620
  code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
H
Haojun Liao 已提交
3621 3622 3623 3624
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3625
  code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
3626 3627 3628
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3629

wmmhello's avatar
wmmhello 已提交
3630
  pResRow->type = TSDBROW_ROW_FMT;
3631
  tsdbRowMergerClear(&merge);
3632
  *freeTSRow = true;
3633

3634
  return TSDB_CODE_SUCCESS;
3635 3636
}

3637
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3638
                           SRow** pTSRow) {
H
Haojun Liao 已提交
3639 3640
  SRowMerger merge = {0};

3641 3642 3643
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3644
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3645
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3646

H
Hongze Cheng 已提交
3647
    int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
3648 3649 3650 3651 3652 3653 3654 3655 3656
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3657

H
Hongze Cheng 已提交
3658
    tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3659 3660 3661 3662 3663 3664
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3665
  } else {
H
Haojun Liao 已提交
3666
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3667

H
Hongze Cheng 已提交
3668
    int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
3669
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3670 3671 3672 3673 3674 3675 3676 3677
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3678

H
Hongze Cheng 已提交
3679
    tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3680 3681 3682 3683 3684
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3685
  }
3686

H
Haojun Liao 已提交
3687 3688
  int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
  tsdbRowMergerClear(&merge);
3689
  return code;
3690 3691
}

3692
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
3693
                            bool* freeTSRow) {
3694 3695
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3696
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3697
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3698

3699 3700
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3701
  if (pBlockScanInfo->iter.hasVal) {
3702 3703 3704 3705 3706 3707
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3708
  if (pBlockScanInfo->iiter.hasVal) {
3709 3710 3711 3712 3713 3714
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3715
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3716
    TSDBKEY k = TSDBROW_KEY(pRow);
3717
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3718

3719
    int32_t code = TSDB_CODE_SUCCESS;
3720 3721
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3722
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
3723
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3724
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
3725
      }
3726
    } else {  // ik.ts == k.ts
3727
      *freeTSRow = true;
3728 3729
      pResRow->type = TSDBROW_ROW_FMT;
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
3730 3731 3732
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3733
    }
3734

3735
    return code;
H
Haojun Liao 已提交
3736 3737
  }

3738
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3739
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
H
Hongze Cheng 已提交
3740
                                    freeTSRow);
H
Haojun Liao 已提交
3741 3742
  }

3743
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3744
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3745 3746 3747 3748 3749
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3750
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3751
  int32_t outputRowIndex = pBlock->info.rows;
3752
  int64_t uid = pScanInfo->uid;
3753 3754 3755

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3756
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3757
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3758

3759
  SColVal colVal = {0};
3760
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3761

3762
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3763
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3764
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3765 3766 3767
    i += 1;
  }

H
Haojun Liao 已提交
3768
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3769
    col_id_t colId = pSupInfo->colId[i];
3770 3771

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3772
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3773

H
Hongze Cheng 已提交
3774
      tRowGet(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3775
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3776 3777 3778
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3779
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3780 3781

      colDataAppendNULL(pColInfoData, outputRowIndex);
3782 3783 3784
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3785
    }
3786 3787
  }

3788
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3789
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3790
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3791
    colDataAppendNULL(pColInfoData, outputRowIndex);
3792 3793 3794
    i += 1;
  }

3795
  pBlock->info.dataLoad = 1;
3796
  pBlock->info.rows += 1;
3797
  pScanInfo->lastKey = pTSRow->ts;
3798 3799 3800
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3801 3802
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3803 3804 3805 3806
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3807
  if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3808
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3809
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3810
    i += 1;
3811 3812 3813
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3814
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3815
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3816

3817
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3818
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3819
    if (pData->cid < pSupInfo->colId[i]) {
3820 3821 3822 3823
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3824 3825
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3826 3827
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3828
      j += 1;
H
Haojun Liao 已提交
3829 3830
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3831 3832 3833 3834 3835 3836 3837
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3838
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3839
    colDataAppendNULL(pCol, outputRowIndex);
3840 3841 3842
    i += 1;
  }

3843
  pResBlock->info.dataLoad = 1;
3844 3845 3846 3847
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3848 3849
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3850 3851 3852
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
3853
    //    SRow* pTSRow = NULL;
3854
    TSDBROW row = {.type = -1};
3855
    bool    freeTSRow = false;
3856 3857
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
    if (row.type == -1) {
3858
      break;
H
Haojun Liao 已提交
3859 3860
    }

3861 3862
    if (row.type == TSDBROW_ROW_FMT) {
      doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
3863

3864 3865 3866 3867 3868
      if (freeTSRow) {
        taosMemoryFree(row.pTSRow);
      }
    } else {
      doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
3869
    }
H
Haojun Liao 已提交
3870 3871

    // no data in buffer, return immediately
3872
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3873 3874 3875
      break;
    }

3876
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3877 3878 3879 3880 3881 3882
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3883

3884 3885
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3886
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3887

3888
  STableBlockScanInfo** p = NULL;
3889
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3890
    clearBlockScanInfo(*p);
3891 3892
  }

3893 3894 3895
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3896
  taosHashClear(pReader->status.pTableMap);
3897
  STableUidList* pUidList = &pReader->status.uidList;
H
Haojun Liao 已提交
3898
  pUidList->currentIndex = 0;
3899

3900 3901
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3902 3903
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
H
Haojun Liao 已提交
3904 3905
    pUidList->tableUidList[i] = pList[i].uid;

3906
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3907 3908
  }

H
Hongze Cheng 已提交
3909 3910 3911
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3912 3913 3914 3915 3916 3917
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3918

dengyihao's avatar
dengyihao 已提交
3919 3920 3921 3922 3923 3924
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3925

H
Hongze Cheng 已提交
3926
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3927

3928
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
3929 3930
  SReaderStatus*  pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
3931

3932 3933
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pStatus->blockIter, pReader->order);
3934

3935 3936 3937
  int32_t code = TSDB_CODE_SUCCESS;
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
3938
  } else {
3939
    code = initForFirstBlockInFile(pReader, pBlockIter);
3940
  }
3941 3942 3943

  if (!pStatus->loadFromFile) {
    resetTableListIndex(pStatus);
3944
  }
3945 3946

  return code;
3947 3948
}

H
refact  
Hongze Cheng 已提交
3949
// ====================================== EXPOSED APIs ======================================
3950
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3951
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3952 3953 3954 3955 3956 3957
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3958 3959 3960
  int32_t capacity = pVnode->config.tsdbCfg.maxRows;
  if (pResBlock != NULL) {
    blockDataEnsureCapacity(pResBlock, capacity);
H
Haojun Liao 已提交
3961 3962 3963
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3964
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3965 3966
    goto _err;
  }
H
Hongze Cheng 已提交
3967

3968
  // check for query time window
H
Haojun Liao 已提交
3969
  STsdbReader* pReader = *ppReader;
3970
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3971 3972 3973
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3974

3975 3976
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3977
    int32_t order = pCond->order;
3978
    if (order == TSDB_ORDER_ASC) {
3979
      pCond->twindows.ekey = window.skey;
3980 3981 3982
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3983
      pCond->twindows.skey = window.ekey;
3984 3985 3986 3987
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3988
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
3989
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
3990 3991 3992 3993 3994
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3995
      pCond->twindows.skey = window.ekey;
3996
      pCond->twindows.ekey = INT64_MAX;
3997
    } else {
3998
      pCond->twindows.skey = INT64_MIN;
3999
      pCond->twindows.ekey = window.ekey;
4000
    }
4001 4002
    pCond->order = order;

H
Haojun Liao 已提交
4003
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
4004 4005 4006 4007 4008
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
4009
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
4010 4011
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
4012
  if (pCond->suid != 0) {
4013
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
4014
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4015
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
4016
    }
4017 4018
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
4019
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
4020
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4021
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
4022
    }
4023 4024
  }

4025
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
4026 4027 4028 4029
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
4030
  }
4031

4032
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
X
Xiaoyu Wang 已提交
4033 4034
  pReader->status.pTableMap =
      createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
H
Haojun Liao 已提交
4035 4036
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
4037
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
4038 4039
    goto _err;
  }
H
Hongze Cheng 已提交
4040

4041
  pReader->suspended = true;
4042

4043
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
4044
  return code;
H
Hongze Cheng 已提交
4045 4046

_err:
H
Haojun Liao 已提交
4047
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
4048
  tsdbReaderClose(pReader);
X
Xiaoyu Wang 已提交
4049
  *ppReader = NULL;  // reset the pointer value.
H
Hongze Cheng 已提交
4050
  return code;
H
refact  
Hongze Cheng 已提交
4051 4052 4053
}

void tsdbReaderClose(STsdbReader* pReader) {
4054 4055
  if (pReader == NULL) {
    return;
4056
  }
H
refact  
Hongze Cheng 已提交
4057

4058
  tsdbAcquireReader(pReader);
4059
  {
H
Haojun Liao 已提交
4060
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
4061
      STsdbReader* p = pReader->innerReader[0];
4062

4063
      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4064
      p->status.uidList.tableUidList = NULL;
4065 4066 4067 4068 4069 4070 4071
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4072
      p->status.uidList.tableUidList = NULL;
4073 4074 4075
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
4076 4077 4078 4079 4080 4081

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

4082
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
4083

4084
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
4085
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
4086 4087 4088 4089
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
4090

H
Haojun Liao 已提交
4091 4092 4093
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
4094

H
Haojun Liao 已提交
4095
  taosMemoryFree(pSupInfo->colId);
H
Hongze Cheng 已提交
4096
  tBlockDataDestroy(&pReader->status.fileBlockData);
4097
  cleanupDataBlockIterator(&pReader->status.blockIter);
4098 4099

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
4100 4101 4102 4103
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }
4104

H
Haojun Liao 已提交
4105 4106 4107
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4108

4109 4110 4111 4112 4113 4114 4115 4116 4117
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

4118
  qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
4119
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
4120
  pReader->pReadSnap = NULL;
4121

4122 4123
  tsdbReleaseReader(pReader);

4124
  tsdbUninitReaderLock(pReader);
4125

4126
  taosMemoryFree(pReader->status.uidList.tableUidList);
H
Haojun Liao 已提交
4127
  SIOCostSummary* pCost = &pReader->cost;
4128

H
Haojun Liao 已提交
4129 4130
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4131 4132
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4133

H
Haojun Liao 已提交
4134
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
H
refact  
Hongze Cheng 已提交
4135

H
Haojun Liao 已提交
4136 4137 4138
    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4139

4140 4141 4142 4143 4144
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
X
Xiaoyu Wang 已提交
4145 4146
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
      "ms, %s",
4147 4148 4149
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
H
Haojun Liao 已提交
4150
      pCost->initDelSkylineIterTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4151

4152 4153
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4154

4155 4156 4157
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
4158

4159
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4160 4161
}

4162 4163 4164 4165 4166 4167 4168 4169 4170 4171
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
  int32_t code = 0;

  // save reader's base state & reset top state to be reconstructed from base state
  SReaderStatus*       pStatus = &pReader->status;
  STableBlockScanInfo* pBlockScanInfo = NULL;

  if (pStatus->loadFromFile) {
    SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
    if (pBlockInfo != NULL) {
4172 4173
      pBlockScanInfo =
          *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
4174 4175 4176 4177 4178 4179 4180
      if (pBlockScanInfo == NULL) {
        code = TSDB_CODE_INVALID_PARA;
        tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                  taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
        goto _err;
      }
    } else {
4181
      pBlockScanInfo = *pStatus->pTableIter;
4182 4183 4184 4185 4186
    }

    tsdbDataFReaderClose(&pReader->pFileReader);

    // resetDataBlockScanInfo excluding lastKey
4187
    STableBlockScanInfo** p = NULL;
4188 4189

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
4190 4191 4192 4193 4194 4195 4196 4197
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
4198 4199
      }

4200 4201 4202 4203 4204 4205
      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
4206 4207
    }
  } else {
4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229
    // resetDataBlockScanInfo excluding lastKey
    STableBlockScanInfo** p = NULL;

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
      }

      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
    }

4230
    pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
4231 4232 4233 4234 4235 4236 4237
    if (pBlockScanInfo) {
      // save lastKey to restore memory iterator
      STimeWindow w = pReader->pResBlock->info.window;
      pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;

      // reset current current table's data block scan info,
      pBlockScanInfo->iterInit = false;
4238 4239
      pBlockScanInfo->iter.hasVal = false;
      pBlockScanInfo->iiter.hasVal = false;
4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254
      if (pBlockScanInfo->iter.iter != NULL) {
        pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
      }

      if (pBlockScanInfo->iiter.iter != NULL) {
        pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
      }

      pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
      tMapDataClear(&pBlockScanInfo->mapData);
      // TODO: keep skyline for reuse
      pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
    }
  }

4255
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
4256
  pReader->pReadSnap = NULL;
4257 4258 4259

  pReader->suspended = true;

4260 4261
  tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
            pReader->idStr);
4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272
  return code;

_err:
  tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

static int32_t tsdbSetQueryReseek(void* pQHandle) {
  int32_t      code = 0;
  STsdbReader* pReader = pQHandle;

4273
  code = tsdbTryAcquireReader(pReader);
4274 4275
  if (code == 0) {
    if (pReader->suspended) {
4276
      tsdbReleaseReader(pReader);
4277 4278 4279 4280
      return code;
    }

    tsdbReaderSuspend(pReader);
4281

4282
    tsdbReleaseReader(pReader);
4283

4284
    return code;
4285 4286 4287
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
4288 4289
    terrno = TAOS_SYSTEM_ERROR(code);
    return TSDB_CODE_FAILED;
4290 4291 4292 4293 4294 4295
  }
}

int32_t tsdbReaderResume(STsdbReader* pReader) {
  int32_t code = 0;

4296
  STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
4297 4298 4299

  //  restore reader's state
  //  task snapshot
4300
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
4301
  if (numOfTables > 0) {
4302
    qTrace("tsdb/reader: %p, take snapshot", pReader);
4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319
    code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    } else {
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];

      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4320
      pPrevReader->status.uidList = pReader->status.uidList;
4321 4322 4323 4324 4325 4326
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4327
      pNextReader->status.uidList = pReader->status.uidList;
4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;

      code = doOpenReaderImpl(pPrevReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

  pReader->suspended = false;

4341 4342
  tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
            pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
4343 4344 4345 4346 4347 4348 4349
  return code;

_err:
  tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

4350
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4351
  // cleanup the data that belongs to the previous data block
4352 4353
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4354

4355
  SReaderStatus* pStatus = &pReader->status;
4356
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
4357 4358
    return false;
  }
H
Haojun Liao 已提交
4359

4360 4361 4362 4363 4364
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4365

4366 4367 4368
    if (pBlock->info.rows > 0) {
      return true;
    } else {
4369
      resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4370
      buildBlockFromBufferSequentially(pReader);
4371
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4372
    }
4373 4374 4375
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4376
  }
H
refact  
Hongze Cheng 已提交
4377 4378
}

4379
bool tsdbNextDataBlock(STsdbReader* pReader) {
4380
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
4381 4382 4383
    return false;
  }

4384 4385
  SReaderStatus* pStatus = &pReader->status;

4386 4387 4388
  int32_t code = tsdbAcquireReader(pReader);
  qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

4389 4390 4391 4392
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4393
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4394
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4395
    pReader->step = EXTERNAL_ROWS_PREV;
4396
    if (ret) {
4397
      pStatus = &pReader->innerReader[0]->status;
4398
      if (pStatus->composedDataBlock) {
4399
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4400
        tsdbReleaseReader(pReader);
4401 4402
      }

4403 4404
      return ret;
    }
4405
  }
4406

4407
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4408 4409
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4410
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4411 4412 4413 4414 4415

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4416
    pReader->step = EXTERNAL_ROWS_MAIN;
4417 4418 4419 4420
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
4421
    if (pStatus->composedDataBlock) {
4422
      qTrace("tsdb/read: %p, unlock read mutex", pReader);
4423
      tsdbReleaseReader(pReader);
4424 4425
    }

4426 4427 4428
    return ret;
  }

4429
  if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
4430 4431
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4432
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4433 4434 4435 4436
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4437
    ret = doTsdbNextDataBlock(pReader->innerReader[1]);
4438
    pReader->step = EXTERNAL_ROWS_NEXT;
4439
    if (ret) {
4440
      pStatus = &pReader->innerReader[1]->status;
4441
      if (pStatus->composedDataBlock) {
4442
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4443
        tsdbReleaseReader(pReader);
4444 4445
      }

4446
      return ret;
4447 4448 4449
    }
  }

4450
  qTrace("tsdb/read: %p, unlock read mutex", pReader);
4451
  tsdbReleaseReader(pReader);
4452

4453 4454 4455
  return false;
}

4456
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
4457 4458
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
4459
  int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
4460
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4472
        taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
4473 4474 4475 4476 4477 4478
      }
      j += 1;
    }
  }
}

4479
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4480 4481
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4482
  int32_t code = 0;
4483
  *allHave = false;
H
Haojun Liao 已提交
4484
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4485

4486
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4487 4488 4489
    return TSDB_CODE_SUCCESS;
  }

4490
  // there is no statistics data for composed block
4491
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4492 4493
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4494

4495
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4496 4497
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4498 4499 4500
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4501 4502

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4503
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4504
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4505
    if (code != TSDB_CODE_SUCCESS) {
4506 4507
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4508 4509
      return code;
    }
4510
  } else {
H
Haojun Liao 已提交
4511
    *pBlockSMA = NULL;
4512
    return TSDB_CODE_SUCCESS;
4513
  }
H
Hongze Cheng 已提交
4514

4515
  *allHave = true;
H
Hongze Cheng 已提交
4516

4517 4518
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4519

4520 4521
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4522 4523 4524 4525
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4526
  size_t numOfCols = pSup->numOfCols;
4527

4528
  // ensure capacity
H
Haojun Liao 已提交
4529 4530 4531
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4532 4533
  }

4534 4535 4536
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
H
Haojun Liao 已提交
4537
    pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
4538
  }
4539

4540
  // do fill all null column value SMA info
4541
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4542
  size_t size = taosArrayGetSize(pSup->pColAgg);
4543

H
Haojun Liao 已提交
4544
  int32_t i = 0, j = 0;
4545
  while (j < numOfCols && i < size) {
4546
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4547 4548
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4549 4550
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4551
    } else if (pAgg->colId < pSup->colId[j]) {
4552
      i += 1;
H
Haojun Liao 已提交
4553
    } else if (pSup->colId[j] < pAgg->colId) {
H
Haojun Liao 已提交
4554
      // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
4555
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4556 4557 4558 4559
      j += 1;
    }
  }

H
Haojun Liao 已提交
4560
  *pBlockSMA = pResBlock->pBlockAgg;
4561
  pReader->cost.smaDataLoad += 1;
4562

4563
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4564
  return code;
H
Hongze Cheng 已提交
4565 4566
}

H
Haojun Liao 已提交
4567
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
4568
  SReaderStatus*       pStatus = &pReader->status;
H
Haojun Liao 已提交
4569
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4570 4571
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4572 4573 4574 4575
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
4576
    return NULL;
4577 4578
  }

4579
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4580
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4581
    tBlockDataDestroy(&pStatus->fileBlockData);
4582 4583
    terrno = code;
    return NULL;
4584
  }
4585

4586
  copyBlockDataToSDataBlock(pReader);
H
Haojun Liao 已提交
4587
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4588 4589
}

H
Haojun Liao 已提交
4590
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4591
  STsdbReader* pTReader = pReader;
4592 4593
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
4594
      pTReader = pReader->innerReader[0];
4595
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
4596
      pTReader = pReader->innerReader[1];
4597 4598 4599
    }
  }

4600 4601 4602 4603 4604 4605 4606
  SReaderStatus* pStatus = &pTReader->status;
  if (pStatus->composedDataBlock) {
    return pTReader->pResBlock;
  }

  SSDataBlock* ret = doRetrieveDataBlock(pTReader);

4607
  qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
4608
  tsdbReleaseReader(pReader);
4609 4610

  return ret;
4611 4612
}

H
Haojun Liao 已提交
4613
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
4614
  qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
4615
  tsdbAcquireReader(pReader);
L
Liu Jicong 已提交
4616 4617 4618 4619 4620

  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

H
Haojun Liao 已提交
4621
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
L
Liu Jicong 已提交
4622
    tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
4623

4624
    tsdbReleaseReader(pReader);
4625

4626 4627
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4628

H
Haojun Liao 已提交
4629 4630 4631
  SReaderStatus* pStatus = &pReader->status;

  SDataBlockIter* pBlockIter = &pStatus->blockIter;
4632

L
Liu Jicong 已提交
4633
  pReader->order = pCond->order;
4634
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
H
Haojun Liao 已提交
4635 4636
  pStatus->loadFromFile = true;
  pStatus->pTableIter = NULL;
H
Haojun Liao 已提交
4637
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4638

4639
  // allocate buffer in order to load data blocks from file
4640
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4641

4642
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4643
  tsdbDataFReaderClose(&pReader->pFileReader);
4644

H
Haojun Liao 已提交
4645
  int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
L
Liu Jicong 已提交
4646

H
Haojun Liao 已提交
4647
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4648
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4649
  resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4650

H
Hongze Cheng 已提交
4651
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
4652
  resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
4653

4654
  int32_t code = 0;
4655

4656
  // no data in files, let's try buffer in memory
H
Haojun Liao 已提交
4657 4658
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
4659
    resetTableListIndex(pStatus);
4660 4661 4662
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4663 4664
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4665

4666
      tsdbReleaseReader(pReader);
4667

4668 4669 4670
      return code;
    }
  }
H
Hongze Cheng 已提交
4671

H
Hongze Cheng 已提交
4672 4673 4674 4675
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4676

4677
  tsdbReleaseReader(pReader);
4678

4679
  return code;
H
Hongze Cheng 已提交
4680
}
H
Hongze Cheng 已提交
4681

4682 4683 4684
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4685

4686 4687 4688 4689
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
4690
  pTableBlockInfo->numOfVgroups = 1;
H
Hongze Cheng 已提交
4691

4692 4693
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4694

4695 4696 4697
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4698

4699
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4700

4701
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4702

4703 4704
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4705

4706 4707
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4708

4709 4710
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4711
  }
H
Hongze Cheng 已提交
4712

4713
  pTableBlockInfo->numOfTables = numOfTables;
4714
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4715

4716 4717
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4718
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4719

4720 4721
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4722

4723 4724 4725
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4726

4727 4728 4729
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4730

4731 4732 4733
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4734

4735
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4736

4737 4738
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4739

H
Haojun Liao 已提交
4740
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4741 4742
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
H
Haojun Liao 已提交
4743
      if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
4744 4745
        break;
      }
H
refact  
Hongze Cheng 已提交
4746

4747 4748
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4749
    }
H
refact  
Hongze Cheng 已提交
4750

H
Hongze Cheng 已提交
4751 4752
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4753
  }
H
Hongze Cheng 已提交
4754

H
refact  
Hongze Cheng 已提交
4755 4756
  return code;
}
H
Hongze Cheng 已提交
4757

H
refact  
Hongze Cheng 已提交
4758
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4759
  int64_t rows = 0;
H
Hongze Cheng 已提交
4760

4761
  SReaderStatus* pStatus = &pReader->status;
4762
  tsdbAcquireReader(pReader);
4763 4764 4765 4766
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4767
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4768

4769
  while (pStatus->pTableIter != NULL) {
4770
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4771 4772

    STbData* d = NULL;
4773
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
4774
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4775 4776 4777 4778 4779 4780
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
4781
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
4782
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4783 4784 4785 4786 4787 4788 4789 4790
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4791

4792
  tsdbReleaseReader(pReader);
4793

H
refact  
Hongze Cheng 已提交
4794
  return rows;
H
Hongze Cheng 已提交
4795
}
D
dapan1121 已提交
4796

L
Liu Jicong 已提交
4797
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4798 4799 4800 4801
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4802
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4803 4804 4805 4806 4807 4808 4809
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4810

D
dapan1121 已提交
4811
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4812
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4813
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4814
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4815 4816 4817 4818 4819 4820
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4821
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4822
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4823 4824 4825 4826
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4827 4828 4829
  }

  metaReaderClear(&mr);
4830
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4831

D
dapan1121 已提交
4832 4833
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4834

H
Hongze Cheng 已提交
4835
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
H
Hongze Cheng 已提交
4836 4837 4838
  int32_t        code = 0;
  STsdb*         pTsdb = pReader->pTsdb;
  SVersionRange* pRange = &pReader->verRange;
H
Hongze Cheng 已提交
4839 4840

  // alloc
H
Hongze Cheng 已提交
4841 4842
  STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
  if (pSnap == NULL) {
H
Hongze Cheng 已提交
4843 4844 4845 4846 4847
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
H
Hongze Cheng 已提交
4848
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4849 4850

  // take snapshot
H
Hongze Cheng 已提交
4851
  if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
H
Hongze Cheng 已提交
4852 4853 4854 4855 4856 4857 4858 4859 4860 4861 4862
    pSnap->pMem = pTsdb->mem;
    pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
    if (pSnap->pNode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pNode->pQHandle = pReader;
    pSnap->pNode->reseek = reseek;

    tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
H
Hongze Cheng 已提交
4863 4864
  }

H
Hongze Cheng 已提交
4865
  if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
H
Hongze Cheng 已提交
4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876
    pSnap->pIMem = pTsdb->imem;
    pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
    if (pSnap->pINode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pINode->pQHandle = pReader;
    pSnap->pINode->reseek = reseek;

    tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
H
Hongze Cheng 已提交
4877 4878
  }

H
Hongze Cheng 已提交
4879
  // fs
H
Hongze Cheng 已提交
4880
  code = tsdbFSRef(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4881 4882 4883 4884
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4885 4886

  // unlock
H
Hongze Cheng 已提交
4887
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4888

4889
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4890

H
Hongze Cheng 已提交
4891
_exit:
H
Hongze Cheng 已提交
4892 4893 4894 4895 4896 4897 4898 4899 4900 4901
  if (code) {
    *ppSnap = NULL;
    if (pSnap) {
      if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
      if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
      taosMemoryFree(pSnap);
    }
  } else {
    *ppSnap = pSnap;
  }
H
Hongze Cheng 已提交
4902 4903 4904
  return code;
}

4905
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) {
H
Hongze Cheng 已提交
4906 4907
  STsdb* pTsdb = pReader->pTsdb;

H
Hongze Cheng 已提交
4908 4909
  if (pSnap) {
    if (pSnap->pMem) {
4910
      tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
H
Hongze Cheng 已提交
4911 4912 4913
    }

    if (pSnap->pIMem) {
4914
      tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
H
Hongze Cheng 已提交
4915 4916
    }

H
Hongze Cheng 已提交
4917
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4918 4919
    if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
    if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
H
Hongze Cheng 已提交
4920
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4921
  }
4922
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4923
}