tsdbRead.c 173.6 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18
#include "tsimplehash.h"
19

H
Hongze Cheng 已提交
20
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
21

22 23 24 25 26 27
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

D
dapan1121 已提交
28 29 30 31 32
typedef enum {
  READ_MODE_COUNT_ONLY = 0x1,
  READ_MODE_ALL,
} EReadMode;

33
typedef struct {
dengyihao's avatar
dengyihao 已提交
34
  STbDataIter* iter;
35 36 37 38
  int32_t      index;
  bool         hasVal;
} SIterInfo;

39 40
typedef struct {
  int32_t numOfBlocks;
41
  int32_t numOfLastFiles;
42 43
} SBlockNumber;

44
typedef struct SBlockIndex {
45 46
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
47
  STimeWindow window;  // todo replace it with overlap flag.
48 49
} SBlockIndex;

H
Haojun Liao 已提交
50
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
51 52
  uint64_t  uid;
  TSKEY     lastKey;
53
  TSKEY     lastKeyInStt;       // last accessed key in stt
H
Hongze Cheng 已提交
54
  SMapData  mapData;            // block info (compressed)
55
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
56 57 58 59 60 61
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
62 63 64
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
65
  int64_t uid;
66
  int64_t offset;
H
Haojun Liao 已提交
67
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
68 69

typedef struct SBlockOrderSupporter {
70 71 72 73
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
74 75 76
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
77 78 79
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
80
  int64_t headFileLoad;
81
  double  headFileLoadTime;
82
  int64_t smaDataLoad;
83
  double  smaLoadTime;
84 85
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
86 87
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
88
  double  createScanInfoList;
X
Xiaoyu Wang 已提交
89 90 91
  //  double  getTbFromMemTime;
  //  double  getTbFromIMemTime;
  double initDelSkylineIterTime;
H
Hongze Cheng 已提交
92 93 94
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
95 96 97 98 99 100 101
  SArray*        pColAgg;
  SColumnDataAgg tsColAgg;
  int16_t*       colId;
  int16_t*       slotId;
  int32_t        numOfCols;
  char**         buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
  bool           smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
102 103
} SBlockLoadSuppInfo;

104
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
105 106 107 108 109
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
110
  SSttBlockLoadInfo* pInfo;
111 112
} SLastBlockReader;

113
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
114 115 116
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
117
  int32_t           order;
H
Hongze Cheng 已提交
118
  SLastBlockReader* pLastBlockReader;  // last file block reader
119
} SFilesetIter;
H
Haojun Liao 已提交
120 121

typedef struct SFileDataBlockInfo {
122
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
123
  uint64_t uid;
124
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
125 126 127
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
128
  int32_t   numOfBlocks;
129
  int32_t   index;
H
Hongze Cheng 已提交
130
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
131
  int32_t   order;
H
Hongze Cheng 已提交
132
  SDataBlk  block;  // current SDataBlk data
133
  SHashObj* pTableMap;
H
Haojun Liao 已提交
134 135 136
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
137 138 139 140
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
141 142
} SFileBlockDumpInfo;

143
typedef struct STableUidList {
144 145
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
146
} STableUidList;
147

H
Haojun Liao 已提交
148
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
149 150 151
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
152
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
153
  STableUidList         uidList;            // check tables in uid order, to avoid the repeatly load of blocks in STT.
H
Hongze Cheng 已提交
154 155 156 157 158
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
159 160
} SReaderStatus;

161
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
162 163 164
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
D
dapan1121 已提交
165
  int32_t numOfTables;
166 167
} SBlockInfoBuf;

H
Hongze Cheng 已提交
168
struct STsdbReader {
H
Haojun Liao 已提交
169
  STsdb*             pTsdb;
170 171 172
  SVersionRange      verRange;
  TdThreadMutex      readerMutex;
  bool               suspended;
H
Haojun Liao 已提交
173 174
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
175
  bool               freeBlock;
D
dapan1121 已提交
176 177
  EReadMode          readMode;
  uint64_t           rowsNum;
H
Haojun Liao 已提交
178 179 180 181
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
182 183
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
184
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
185
  STsdbReadSnap*     pReadSnap;
186
  SIOCostSummary     cost;
187 188 189 190 191 192 193 194 195
  STSchema*          pSchema;  // the newest version schema
  //  STSchema*          pMemSchema;   // the previous schema for in-memory data, to avoid load schema too many times
  SSHashObj*    pSchemaMap;   // keep the retrieved schema info, to avoid the overhead by repeatly load schema
  SDataFReader* pFileReader;  // the file reader
  SDelFReader*  pDelFReader;  // the del file reader
  SArray*       pDelIdx;      // del file block index;
  SBlockInfoBuf blockInfoBuf;
  int32_t       step;
  STsdbReader*  innerReader[2];
H
Hongze Cheng 已提交
196
};
H
Hongze Cheng 已提交
197

H
Haojun Liao 已提交
198
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
199 200
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
201
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
202 203
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
204
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
205
                                       SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
206
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
207
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
208
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
H
Hongze Cheng 已提交
209
                                     STableBlockScanInfo* pInfo);
210
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
211
                                         int32_t rowIndex);
212
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
213 214
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
215

H
Hongze Cheng 已提交
216
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
217
                                        TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
H
Hongze Cheng 已提交
218
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
H
Hongze Cheng 已提交
219
                                  STsdbReader* pReader, SRow** pTSRow);
220 221
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
222

dengyihao's avatar
dengyihao 已提交
223 224 225 226
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
227
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
228 229 230
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
C
Cary Xu 已提交
231
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
232
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
233
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
234
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
C
Cary Xu 已提交
235

H
Haojun Liao 已提交
236 237
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);

238 239
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);

C
Cary Xu 已提交
240
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
241

242 243
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
                                   int32_t numOfCols) {
244
  pSupInfo->smaValid = true;
245
  pSupInfo->numOfCols = numOfCols;
246
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
H
Haojun Liao 已提交
247 248
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
249 250
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
251

H
Haojun Liao 已提交
252
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
253
  pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
254
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
255 256
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
257

H
Haojun Liao 已提交
258 259
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
260 261
    } else {
      pSupInfo->buildBuf[i] = NULL;
262
    }
H
Haojun Liao 已提交
263
  }
H
Hongze Cheng 已提交
264

H
Haojun Liao 已提交
265 266
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
267

H
Haojun Liao 已提交
268
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
269 270
  int32_t i = 0, j = 0;

H
Hongze Cheng 已提交
271
  while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
272
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
273
    if (pTCol->colId == pSupInfo->colId[j]) {
274 275
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
276
        return TSDB_CODE_SUCCESS;
277 278 279 280
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
281
    } else if (pTCol->colId < pSupInfo->colId[j]) {
282 283 284
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
285
      return TSDB_CODE_INVALID_PARA;
286 287
    }
  }
H
Haojun Liao 已提交
288 289

  return TSDB_CODE_SUCCESS;
290 291
}

292
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
293
  int32_t num = numOfTables / pBuf->numPerBucket;
294 295 296 297 298
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
299
  for (int32_t i = 0; i < num; ++i) {
300 301 302 303
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
304

305 306 307 308 309 310 311
    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
312
    }
313
    taosArrayPush(pBuf->pData, &p);
H
Haojun Liao 已提交
314
  }
H
Hongze Cheng 已提交
315

D
dapan1121 已提交
316 317 318 319 320 321 322 323 324 325 326
  pBuf->numOfTables = numOfTables;

  return TSDB_CODE_SUCCESS;
}

static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  if (numOfTables <= pBuf->numOfTables) {
    return TSDB_CODE_SUCCESS;
  }

  if (pBuf->numOfTables > 0) {
327
    STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
D
dapan1121 已提交
328
    taosMemoryFree(*p);
D
dapan1121 已提交
329 330
    pBuf->numOfTables /= pBuf->numPerBucket;
  }
331

D
dapan1121 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
  int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
  int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for (int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  pBuf->numOfTables = numOfTables;

H
Haojun Liao 已提交
357 358
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
359

360 361
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
362
  for (int32_t i = 0; i < num; ++i) {
363 364 365 366 367 368 369 370 371
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
372
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
373 374 375
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

H
Haojun Liao 已提交
376 377 378 379 380 381 382 383 384 385
static int32_t uidComparFunc(const void* p1, const void* p2) {
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
  if (pu1 == pu2) {
    return 0;
  } else {
    return (pu1 < pu2) ? -1 : 1;
  }
}

386
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Hongze Cheng 已提交
387
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
X
Xiaoyu Wang 已提交
388
                                         STableUidList* pUidList, int32_t numOfTables) {
H
Haojun Liao 已提交
389
  // allocate buffer in order to load data blocks from file
390
  // todo use simple hash instead, optimize the memory consumption
391 392 393
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
394 395 396
    return NULL;
  }

H
Haojun Liao 已提交
397
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
398
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
  if (pUidList->tableUidList == NULL) {
H
Haojun Liao 已提交
402
    taosHashCleanup(pTableMap);
H
Haojun Liao 已提交
403 404
    return NULL;
  }
H
Haojun Liao 已提交
405

H
Haojun Liao 已提交
406
  pUidList->currentIndex = 0;
H
Haojun Liao 已提交
407

408
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
409
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
H
Haojun Liao 已提交
410

411
    pScanInfo->uid = idList[j].uid;
H
Haojun Liao 已提交
412
    pUidList->tableUidList[j] = idList[j].uid;
H
Haojun Liao 已提交
413

414
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
415
      int64_t skey = pTsdbReader->window.skey;
416
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
H
Haojun Liao 已提交
417
      pScanInfo->lastKeyInStt = skey;
wmmhello's avatar
wmmhello 已提交
418
    } else {
H
Haojun Liao 已提交
419
      int64_t ekey = pTsdbReader->window.ekey;
420
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
421
      pScanInfo->lastKeyInStt = ekey;
H
Haojun Liao 已提交
422
    }
wmmhello's avatar
wmmhello 已提交
423

424
    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
425 426
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
427 428
  }

H
Haojun Liao 已提交
429
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
H
Haojun Liao 已提交
430

H
Haojun Liao 已提交
431 432 433 434
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
435

436
  return pTableMap;
H
Hongze Cheng 已提交
437
}
H
Hongze Cheng 已提交
438

439
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
440
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
441
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
442
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
443 444

    pInfo->iterInit = false;
H
Haojun Liao 已提交
445
    pInfo->iter.hasVal = false;
446
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
447

448 449
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
450 451
    }

H
Haojun Liao 已提交
452 453
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
454 455
    }

456 457
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
458
    pInfo->lastKeyInStt = ts + step;
459 460 461
  }
}

462 463
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
464 465

  p->iter.hasVal = false;
466
  p->iiter.hasVal = false;
467

468 469 470
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
471

472 473 474
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
475

476 477 478 479
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
480

H
Haojun Liao 已提交
481
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
482
  void* p = NULL;
483
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
484
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
485 486 487 488 489
  }

  taosHashCleanup(pTableMap);
}

490
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
H
Hongze Cheng 已提交
491

492 493 494
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
495
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
496

497
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
498
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
499

dengyihao's avatar
dengyihao 已提交
500
  STimeWindow win = *pWindow;
501 502 503 504 505 506
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
507

H
Haojun Liao 已提交
508
// init file iterator
509
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
510
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
511

512 513
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
514
  pIter->pFileList = aDFileSet;
515
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
516

517 518 519 520
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
521
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
522 523
      return code;
    }
524 525
  }

526 527 528 529 530 531 532 533
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

534
  if (pLReader->pInfo == NULL) {
535
    // here we ignore the first column, which is always be the primary timestamp column
536 537 538
    SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;

    int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
X
Xiaoyu Wang 已提交
539
    pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
H
Haojun Liao 已提交
540 541 542 543
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
544 545
  }

546
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
547 548 549
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
550
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) {
551 552
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
553
  pIter->index += step;
D
dapan1121 已提交
554
  int32_t code = 0;
555 556

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
557 558
    *hasNext = false;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
559 560
  }

H
Haojun Liao 已提交
561 562 563
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

564 565
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
566
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
567

H
Haojun Liao 已提交
568 569
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
570

571
  while (1) {
H
Haojun Liao 已提交
572 573 574
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
575

576
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
577

D
dapan1121 已提交
578
    code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
579 580 581
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
582

583 584
    pReader->cost.headFileLoad += 1;

585 586 587 588 589 590 591
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
592 593
      *hasNext = false;
      return TSDB_CODE_SUCCESS;
594 595 596 597
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
598
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
599 600
        *hasNext = false;
        return TSDB_CODE_SUCCESS;
601
      }
602 603
      continue;
    }
C
Cary Xu 已提交
604

605
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
606
              pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
607 608
    *hasNext = true;
    return TSDB_CODE_SUCCESS;
609
  }
610

611
_err:
D
dapan1121 已提交
612 613
  *hasNext = false;
  return code;
H
Haojun Liao 已提交
614 615
}

616
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
617 618
  pIter->order = order;
  pIter->index = -1;
619
  pIter->numOfBlocks = 0;
620 621 622 623 624 625 626
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
627
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
628

H
Haojun Liao 已提交
629
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
630 631
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
632 633
}

634 635 636 637 638 639 640 641
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
642
    SColumnInfoData colInfo = {0};
643 644 645 646 647 648 649 650 651 652 653 654 655
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexInit(&pReader->readerMutex, NULL);

  qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexDestroy(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexTryLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbReleaseReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexUnlock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

711 712 713 714 715 716
void tsdbReleaseDataBlock(STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;
  if (!pStatus->composedDataBlock) {
    tsdbReleaseReader(pReader);
  }
}
717

718
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
719
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
720
  int32_t      code = 0;
721
  int8_t       level = 0;
H
Haojun Liao 已提交
722
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
723 724
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
725
    goto _end;
H
Hongze Cheng 已提交
726 727
  }

C
Cary Xu 已提交
728
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
729
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
730 731
  }

H
Haojun Liao 已提交
732
  initReaderStatus(&pReader->status);
733

L
Liu Jicong 已提交
734
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
735 736
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
737
  pReader->capacity = capacity;
H
Haojun Liao 已提交
738
  pReader->pResBlock = pResBlock;
739
  pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
dengyihao's avatar
dengyihao 已提交
740
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
741
  pReader->type = pCond->type;
742
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
743
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
744

H
Haojun Liao 已提交
745 746 747 748 749 750 751 752
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }
753

H
Haojun Liao 已提交
754 755 756 757 758
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }
H
Hongze Cheng 已提交
759

760 761
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
762
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
763
  if (pSup->pColAgg == NULL) {
764 765 766
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
767

768 769
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
770
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
771 772 773 774 775
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

776
  setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
777

778
  tsdbInitReaderLock(pReader);
779

H
Hongze Cheng 已提交
780 781
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
782

H
Haojun Liao 已提交
783 784
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
785 786 787
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
788

H
Haojun Liao 已提交
789
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
X
Xiaoyu Wang 已提交
790
  int64_t    st = taosGetTimestampUs();
791 792 793
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
794
    goto _end;
H
Haojun Liao 已提交
795
  }
H
Hongze Cheng 已提交
796

H
Haojun Liao 已提交
797 798
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

799 800
  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
801
  if (num == 0) {
802
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
803 804
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
805

H
Haojun Liao 已提交
806
  // todo binary search to the start position
807 808
  int64_t et1 = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
809
  SBlockIdx*     pBlockIdx = NULL;
810
  STableUidList* pList = &pReader->status.uidList;
H
Haojun Liao 已提交
811

H
Haojun Liao 已提交
812
  int32_t i = 0, j = 0;
X
Xiaoyu Wang 已提交
813
  while (i < num && j < numOfTables) {
H
Haojun Liao 已提交
814
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Hongze Cheng 已提交
815
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
816
      i += 1;
H
Haojun Liao 已提交
817 818 819
      continue;
    }

H
Haojun Liao 已提交
820 821
    if (pBlockIdx->uid < pList->tableUidList[j]) {
      i += 1;
H
Haojun Liao 已提交
822 823 824
      continue;
    }

H
Haojun Liao 已提交
825
    if (pBlockIdx->uid > pList->tableUidList[j]) {
H
Haojun Liao 已提交
826
      j += 1;
H
Haojun Liao 已提交
827
      continue;
H
Haojun Liao 已提交
828 829
    }

H
Haojun Liao 已提交
830
    if (pBlockIdx->uid == pList->tableUidList[j]) {
H
Haojun Liao 已提交
831
      // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
832 833 834
      STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
      if (pScanInfo == NULL) {
        return terrno;
H
Haojun Liao 已提交
835 836 837 838 839 840 841
      }

      if (pScanInfo->pBlockList == NULL) {
        pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
      }

      taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
842

H
Haojun Liao 已提交
843
      i += 1;
H
Haojun Liao 已提交
844
      j += 1;
845
    }
H
Haojun Liao 已提交
846
  }
H
Hongze Cheng 已提交
847

848
  int64_t et2 = taosGetTimestampUs();
H
Haojun Liao 已提交
849 850 851
  tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0,
            pReader->idStr);
852 853 854

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

855
_end:
856
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
857 858
  return code;
}
H
Hongze Cheng 已提交
859

860
static void cleanupTableScanInfo(SHashObj* pTableMap) {
861
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
862
  while (1) {
863
    px = taosHashIterate(pTableMap, px);
864 865 866 867
    if (px == NULL) {
      break;
    }

868
    // reset the index in last block when handing a new file
869 870
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
871
  }
872 873
}

874
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
875 876 877 878 879 880
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
881

dengyihao's avatar
dengyihao 已提交
882
  for (int32_t i = 0; i < numOfTables; ++i) {
X
Xiaoyu Wang 已提交
883
    SBlockIdx*           pBlockIdx = taosArrayGet(pIndexList, i);
H
Haojun Liao 已提交
884 885 886 887
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
    if (pScanInfo == NULL) {
      return terrno;
    }
H
Hongze Cheng 已提交
888

889
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
890
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
891
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
892

893
    sizeInDisk += pScanInfo->mapData.nData;
894 895 896 897 898 899 900 901 902 903 904 905 906

    int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
    STimeWindow w = pReader->window;
    if (ASCENDING_TRAVERSE(pReader->order)) {
      w.skey = pScanInfo->lastKey + step;
    } else {
      w.ekey = pScanInfo->lastKey + step;
    }

    if (isEmptyQueryTimeWindow(&w)) {
      continue;
    }

H
Haojun Liao 已提交
907
    SDataBlk block = {0};
908
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
909
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
910

911
      // 1. time range check
912 913
      // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
      if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
H
Haojun Liao 已提交
914 915
        continue;
      }
H
Hongze Cheng 已提交
916

917
      // 2. version range check
H
Hongze Cheng 已提交
918
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
919 920
        continue;
      }
921

922
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
923
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
924

H
Haojun Liao 已提交
925 926
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
927
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
928 929
        return TSDB_CODE_OUT_OF_MEMORY;
      }
930

931
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
932
    }
H
Hongze Cheng 已提交
933

H
Haojun Liao 已提交
934
    if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
935 936 937 938
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
939
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
940
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
941

942
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
943
  tsdbDebug(
944
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
945
      "time:%.2f ms %s",
946
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
947
      pReader->idStr);
948

949
  pReader->cost.numOfBlocks += total;
950
  pReader->cost.headFileLoadTime += el;
951

H
Haojun Liao 已提交
952 953
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
954

955
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
956
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
957
  pDumpInfo->allDumped = true;
958
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
959 960
}

D
dapan1121 已提交
961
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
962
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
963
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
964
    if (!COL_VAL_IS_VALUE(pColVal)) {
965
      colDataSetNULL(pColInfoData, rowIndex);
H
Haojun Liao 已提交
966 967
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
D
dapan1121 已提交
968 969 970 971
      if (pColVal->value.nData > pColInfoData->info.bytes) {
        tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes);
        return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
      }
972 973 974 975
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

976
      colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
H
Haojun Liao 已提交
977 978
    }
  } else {
979
    colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
980
  }
D
dapan1121 已提交
981 982

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
983 984
}

985
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
986 987 988
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
989 990
    return NULL;
  }
991 992 993

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
994 995
}

H
Hongze Cheng 已提交
996
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
997

C
Cary Xu 已提交
998 999 1000 1001 1002 1003
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Haojun Liao 已提交
1004
  ASSERT(pos >= 0 && pos < num && num > 0);
C
Cary Xu 已提交
1005 1006
  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
1007 1008
    e = num - 1;
    if (key < keyList[pos]) return -1;
C
Cary Xu 已提交
1009 1010
    while (1) {
      // check can return
H
Hongze Cheng 已提交
1011 1012 1013
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
C
Cary Xu 已提交
1014 1015

      // change start or end position
H
Hongze Cheng 已提交
1016
      int mid = s + (e - s + 1) / 2;
C
Cary Xu 已提交
1017 1018
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
1019
      else if (keyList[mid] < key)
C
Cary Xu 已提交
1020 1021 1022 1023
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
1024
  } else {  // DESC
C
Cary Xu 已提交
1025
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
1026 1027
    e = 0;
    if (key > keyList[pos]) return -1;
C
Cary Xu 已提交
1028 1029
    while (1) {
      // check can return
H
Hongze Cheng 已提交
1030 1031 1032
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
C
Cary Xu 已提交
1033 1034

      // change start or end position
H
Hongze Cheng 已提交
1035
      int mid = s - (s - e + 1) / 2;
C
Cary Xu 已提交
1036 1037
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
1038
      else if (keyList[mid] > key)
C
Cary Xu 已提交
1039 1040 1041 1042 1043 1044 1045
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
1046
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
C
Cary Xu 已提交
1047 1048
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
1049
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
1050 1051 1052 1053 1054 1055

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
1056 1057
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
C
Cary Xu 已提交
1058 1059 1060 1061 1062
  }

  return endPos;
}

H
Haojun Liao 已提交
1063
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
1083 1084
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Hongze Cheng 已提交
1085
                            int32_t dumpedRows, bool asc) {
H
Haojun Liao 已提交
1086 1087 1088 1089 1090 1091 1092 1093
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

H
Hongze Cheng 已提交
1094
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1095

H
Haojun Liao 已提交
1096
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
1097
  //  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
1098 1099 1100 1101 1102 1103

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
H
Hongze Cheng 已提交
1104
    switch (pColData->info.type) {
H
Haojun Liao 已提交
1105 1106 1107
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
1108
      case TSDB_DATA_TYPE_UBIGINT: {
H
Haojun Liao 已提交
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
H
Hongze Cheng 已提交
1122
        int32_t mid = dumpedRows >> 1u;
H
Haojun Liao 已提交
1123 1124
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1125
          int8_t t = pts[j];
H
Haojun Liao 已提交
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1150
          int32_t t = pts[j];
H
Haojun Liao 已提交
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1173
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
1174 1175 1176 1177
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1178

1179
  SBlockData*         pBlockData = &pStatus->fileBlockData;
C
Cary Xu 已提交
1180
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1181
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1182
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1183
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
D
dapan1121 已提交
1184
  int32_t             code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1185

H
Haojun Liao 已提交
1186
  SColVal cv = {0};
1187
  int64_t st = taosGetTimestampUs();
1188 1189
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1190

1191 1192
  // no data exists, return directly.
  if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
X
Xiaoyu Wang 已提交
1193 1194
    tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
             pBlockInfo->uid, pReader->idStr);
1195 1196 1197 1198
    pResBlock->info.rows = 0;
    return 0;
  }

1199 1200
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1201 1202 1203
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1204
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1205
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1206 1207 1208
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1218
    }
C
Cary Xu 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
  }

  // time window check
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1229 1230 1231
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1232 1233
  }

H
Haojun Liao 已提交
1234
  int32_t i = 0;
C
Cary Xu 已提交
1235 1236
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1237 1238
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1239
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1240 1241 1242
    i += 1;
  }

1243
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1244
  int32_t num = pBlockData->nColData;
1245
  while (i < numOfOutputCols && colIndex < num) {
1246 1247
    rowIndex = 0;

H
Hongze Cheng 已提交
1248
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1249
    if (pData->cid < pSupInfo->colId[i]) {
1250
      colIndex += 1;
H
Haojun Liao 已提交
1251 1252
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1253

H
Hongze Cheng 已提交
1254
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
1255
        colDataSetNNULL(pColData, 0, dumpedRows);
C
Cary Xu 已提交
1256
      } else {
H
Haojun Liao 已提交
1257
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1258 1259
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1260
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
C
Cary Xu 已提交
1261
            tColDataGetValue(pData, j, &cv);
D
dapan1121 已提交
1262 1263 1264 1265
            code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
            if (code) {
              return code;
            }
C
Cary Xu 已提交
1266 1267
          }
        }
H
Haojun Liao 已提交
1268
      }
C
Cary Xu 已提交
1269

1270
      colIndex += 1;
1271
      i += 1;
1272
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1273
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1274
      colDataSetNNULL(pColData, 0, dumpedRows);
1275
      i += 1;
H
Haojun Liao 已提交
1276
    }
1277 1278
  }

1279
  // fill the mis-matched columns with null value
1280
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1281
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1282
    colDataSetNNULL(pColData, 0, dumpedRows);
1283
    i += 1;
H
Haojun Liao 已提交
1284
  }
H
Haojun Liao 已提交
1285

1286
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1287 1288
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1289

1290
  // check if current block are all handled
C
Cary Xu 已提交
1291 1292
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1293 1294 1295
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
C
Cary Xu 已提交
1296
  } else {
1297 1298
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
C
Cary Xu 已提交
1299
  }
H
Haojun Liao 已提交
1300

1301
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1302
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1303

1304
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1305
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1306
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1307
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1308
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1309 1310 1311 1312

  return TSDB_CODE_SUCCESS;
}

1313 1314
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1315
  int32_t code = 0;
1316 1317
  int64_t st = taosGetTimestampUs();

1318
  tBlockDataReset(pBlockData);
1319 1320
  STSchema* pSchema = getLatestTableSchema(pReader, uid);
  if (pSchema == NULL) {
X
Xiaoyu Wang 已提交
1321
    tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
1322 1323 1324 1325
    return code;
  }

  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
X
Xiaoyu Wang 已提交
1326
  TABLEID             tid = {.suid = pReader->suid, .uid = uid};
1327
  code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
1328 1329 1330 1331
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1332
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1333
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1334

H
Hongze Cheng 已提交
1335
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1336
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1337 1338 1339
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
1340
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1341 1342 1343
              tstrerror(code), pReader->idStr);
    return code;
  }
1344

1345
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1346

1347 1348 1349 1350
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1351 1352 1353

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1354

H
Haojun Liao 已提交
1355
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1356
}
H
Hongze Cheng 已提交
1357

H
Haojun Liao 已提交
1358 1359 1360
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1361

H
Haojun Liao 已提交
1362 1363 1364 1365
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1366

H
Haojun Liao 已提交
1367 1368
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1369

H
Haojun Liao 已提交
1370 1371
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1372 1373
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1374

H
Haojun Liao 已提交
1375 1376 1377 1378
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1379

H
Haojun Liao 已提交
1380 1381
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1382

H
Haojun Liao 已提交
1383
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1384
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1385
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1386

H
Haojun Liao 已提交
1387
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1388

H
Haojun Liao 已提交
1389 1390
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1391

H
Haojun Liao 已提交
1392 1393 1394 1395 1396 1397 1398
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1399

1400
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1401
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1402

1403 1404 1405
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1406
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1407 1408
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1409
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1410
    if (pScanInfo == NULL) {
H
Haojun Liao 已提交
1411
      return terrno;
H
Haojun Liao 已提交
1412 1413
    }

H
Haojun Liao 已提交
1414 1415
    SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1416
  }
1417 1418 1419 1420 1421 1422

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1423
}
H
Hongze Cheng 已提交
1424

1425
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1426
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1427

1428
  SBlockOrderSupporter sup = {0};
1429
  pBlockIter->numOfBlocks = numOfBlocks;
1430
  taosArrayClear(pBlockIter->blockList);
1431
  pBlockIter->pTableMap = pReader->status.pTableMap;
1432

1433 1434
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1435

1436
  int64_t st = taosGetTimestampUs();
1437
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1438 1439 1440
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1441

1442 1443 1444 1445 1446 1447 1448
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1449

1450
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1451 1452 1453
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1454

1455 1456
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1457

1458 1459 1460
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1461
      return TSDB_CODE_OUT_OF_MEMORY;
1462
    }
H
Haojun Liao 已提交
1463

1464
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1465

1466 1467 1468
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1469
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1470 1471 1472 1473 1474
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1475

H
Haojun Liao 已提交
1476 1477 1478 1479
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1480

1481
  // since there is only one table qualified, blocks are not sorted
1482 1483
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1484 1485
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1486
    }
1487

1488
    int64_t et = taosGetTimestampUs();
1489
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1490
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1491

1492
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1493
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1494
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1495
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1496
  }
H
Haojun Liao 已提交
1497

1498 1499
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1500

1501
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1502 1503

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1504 1505
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1506
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1507
  }
H
Haojun Liao 已提交
1508

1509 1510 1511 1512
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1513

1514 1515
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1516

1517 1518 1519 1520
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1521

1522 1523
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1524
  }
H
Haojun Liao 已提交
1525

1526
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1527 1528
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1529 1530
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1531

1532
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1533
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1534

1535
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1536
}
H
Hongze Cheng 已提交
1537

H
Haojun Liao 已提交
1538
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1539 1540
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1541
  int32_t step = asc ? 1 : -1;
1542
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1543 1544 1545
    return false;
  }

1546
  pBlockIter->index += step;
H
Haojun Liao 已提交
1547
  doSetCurrentBlock(pBlockIter, idStr);
1548

1549 1550 1551
  return true;
}

1552 1553 1554
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1555
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1556 1557
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1558 1559
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1560
}
H
Hongze Cheng 已提交
1561

1562
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1563
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1564
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1565
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1566
    return false;
1567 1568
  }

H
Haojun Liao 已提交
1569
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1570
    return false;
1571 1572
  }

1573
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1574
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1575 1576
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1577
  return true;
1578 1579 1580
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1581
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1582 1583
  int32_t index = pBlockIter->index;

1584
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1596
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1597
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1598 1599 1600 1601
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1602 1603 1604 1605 1606
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1607

1608 1609 1610
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1611

H
Haojun Liao 已提交
1612
  doSetCurrentBlock(pBlockIter, "");
1613 1614 1615
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1616
// todo: this attribute could be acquired during extractin the global ordered block list.
1617
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1618 1619
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1620
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1621
  } else {
1622
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1623
  }
H
Haojun Liao 已提交
1624
}
H
Hongze Cheng 已提交
1625

H
Hongze Cheng 已提交
1626
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1627
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1628

1629
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1630
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1631
}
H
Hongze Cheng 已提交
1632

H
Hongze Cheng 已提交
1633
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1634 1635
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1636 1637
}

H
Hongze Cheng 已提交
1638 1639
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1640 1641
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1642
  for (int32_t i = startIndex; i < num; i += 1) {
1643 1644
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1645
      if (p->version >= pBlock->minVer) {
1646 1647 1648
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1649
      if (p->version >= pBlock->minVer) {
1650 1651
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1652 1653
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1667
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1668 1669 1670 1671
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1672
  // ts is not overlap
1673
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1674
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1675 1676 1677 1678 1679
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1680
  if (ASCENDING_TRAVERSE(order)) {
1681
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1682 1683
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1684
    while (1) {
1685 1686 1687 1688 1689
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1690 1691 1692
      }
    }

1693
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1694
  }
1695 1696
}

C
Cary Xu 已提交
1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1710 1711
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1712

1713
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1714

1715
  // overlap with neighbor
1716
  if (hasNeighbor) {
1717
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1718 1719
  }

1720
  // has duplicated ts of different version in this block
C
Cary Xu 已提交
1721 1722
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1723

1724 1725 1726
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1727 1728
  }

C
Cary Xu 已提交
1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}

// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
1744

C
Cary Xu 已提交
1745 1746 1747
  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1748 1749 1750 1751

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
X
Xiaoyu Wang 已提交
1752
              " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
1753
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
C
Cary Xu 已提交
1754 1755 1756
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1757 1758 1759
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1760 1761
}

C
Cary Xu 已提交
1762 1763 1764 1765 1766 1767 1768 1769 1770
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1771
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1772
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1773 1774
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1775

1776 1777 1778
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1779
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1780

H
Haojun Liao 已提交
1781
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1782
  pBlock->info.id.uid = pBlockScanInfo->uid;
1783

1784
  setComposedBlockFlag(pReader, true);
1785

1786
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
D
dapan1121 已提交
1787
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
X
Xiaoyu Wang 已提交
1788
            " - %" PRId64 ", uid:%" PRIu64 ",  %s",
1789
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
1790
            pBlockScanInfo->uid, pReader->idStr);
1791 1792

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1793 1794 1795
  return code;
}

1796
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
D
dapan1121 已提交
1797
                                            SFileBlockDumpInfo* pDumpInfo, bool *copied) {
1798 1799 1800
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
D
dapan1121 已提交
1801 1802 1803
  int32_t code = TSDB_CODE_SUCCESS;

  *copied = false;
1804 1805
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1806
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1807 1808

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1809
    if (nextKey != key) {  // merge is not needed
D
dapan1121 已提交
1810 1811 1812 1813
      code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
      if (code) {
        return code;
      }
1814
      pDumpInfo->rowIndex += step;
D
dapan1121 已提交
1815
      *copied = true;
1816 1817 1818
    }
  }

D
dapan1121 已提交
1819
  return code;
1820 1821
}

1822
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
1823
                                  SVersionRange* pVerRange) {
X
Xiaoyu Wang 已提交
1824
  int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
H
Haojun Liao 已提交
1825

1826 1827
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
1828
    if (!hasVal) {  // the next value will be the accessed key in stt
1829
      pScanInfo->lastKeyInStt += step;
1830 1831 1832 1833 1834
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1835 1836 1837
    pScanInfo->lastKeyInStt = k.ts;

    if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
H
Haojun Liao 已提交
1838 1839
      // the qualifed ts may equal to k.ts, only a greater version one.
      // here we need to fallback one step.
1840 1841 1842 1843 1844 1845
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
D
dapan1121 已提交
1846 1847 1848 1849 1850
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) {
  int32_t code = TSDB_CODE_SUCCESS;

  *copied = false;

1851
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1852 1853 1854
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
D
dapan1121 已提交
1855 1856 1857 1858 1859 1860 1861
      code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      if (code) {
        return code;
      }
      
      *copied = true;
      return code;
1862 1863
    }
  } else {
D
dapan1121 已提交
1864 1865 1866 1867 1868 1869 1870
    code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    if (code) {
      return code;
    }
      
    *copied = true;
    return code;
1871 1872
  }

D
dapan1121 已提交
1873
  return code;
1874 1875
}

1876 1877 1878 1879 1880 1881
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
  if (pReader->pSchema != NULL) {
    return pReader->pSchema;
  }

  pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
X
Xiaoyu Wang 已提交
1882
  if (pReader->pSchema == NULL) {
1883 1884 1885
    tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
  }

X
Xiaoyu Wang 已提交
1886
  return pReader->pSchema;
1887 1888
}

H
Haojun Liao 已提交
1889 1890 1891
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1892
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1893 1894
  }

1895
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1896 1897 1898
    return pReader->pSchema;
  }

1899 1900
  void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
  if (p != NULL) {
1901
    return *(STSchema**)p;
H
Haojun Liao 已提交
1902 1903
  }

1904
  STSchema* ptr = NULL;
1905
  int32_t   code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
1906
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1907 1908
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1909
  } else {
1910 1911 1912 1913 1914 1915
    code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
    return ptr;
H
Haojun Liao 已提交
1916
  }
H
Haojun Liao 已提交
1917 1918
}

1919
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1920 1921
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
1922
  SRow*               pTSRow = NULL;
1923 1924 1925
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1926
  int64_t tsLast = INT64_MIN;
1927
  if (hasDataInLastBlock(pLastBlockReader)) {
1928 1929
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1930

H
Hongze Cheng 已提交
1931 1932
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1933

1934 1935
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1936
    minKey = INT64_MAX;  // chosen the minimum value
1937
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1938 1939
      minKey = tsLast;
    }
1940

1941 1942 1943
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1944

1945
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1946 1947 1948 1949
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1950
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1951 1952 1953 1954 1955 1956 1957
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1958
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1959 1960
      minKey = key;
    }
1961 1962 1963 1964
  }

  bool init = false;

1965
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1966
  // DESC: mem -----> imem -----> last block -----> file block
1967 1968
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1969
      init = true;
H
Hongze Cheng 已提交
1970
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1971 1972 1973
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1974
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1975 1976
    }

1977
    if (minKey == tsLast) {
1978
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1979
      if (init) {
H
Hongze Cheng 已提交
1980
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1981
      } else {
1982
        init = true;
H
Hongze Cheng 已提交
1983
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1984 1985 1986
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1987
      }
1988
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
1989
    }
1990

1991
    if (minKey == k.ts) {
K
kailixu 已提交
1992 1993 1994 1995
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1996
      if (init) {
X
Xiaoyu Wang 已提交
1997
        tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1998
      } else {
1999
        init = true;
X
Xiaoyu Wang 已提交
2000
        int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2001 2002 2003 2004 2005 2006 2007
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2008 2009 2010 2011 2012
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2013
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2014
      int32_t   code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2015 2016 2017 2018 2019
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2020
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
2021 2022
        return code;
      }
2023 2024
    }

2025
    if (minKey == tsLast) {
2026
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2027
      if (init) {
H
Hongze Cheng 已提交
2028
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2029
      } else {
2030
        init = true;
H
Hongze Cheng 已提交
2031
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2032 2033 2034
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2035
      }
2036
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2037 2038 2039
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2040
      if (init) {
H
Hongze Cheng 已提交
2041
        tsdbRowMerge(&merge, &fRow);
H
Haojun Liao 已提交
2042
      } else {
2043
        init = true;
H
Hongze Cheng 已提交
2044
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2045 2046 2047
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2048 2049 2050
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
2051 2052
  }

H
Hongze Cheng 已提交
2053
  int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
2054 2055 2056 2057
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

D
dapan1121 已提交
2058
  code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2059 2060

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2061
  tsdbRowMergerClear(&merge);
D
dapan1121 已提交
2062 2063

  return code;
2064 2065
}

2066 2067 2068
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
2069
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
2070
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
D
dapan1121 已提交
2071 2072
  bool       copied = false;
  int32_t    code = TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
2073
  SRow*      pTSRow = NULL;
2074
  SRowMerger merge = {0};
2075
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2076
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
2077

2078 2079
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
D
dapan1121 已提交
2080 2081 2082 2083 2084 2085
    code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
    if (code) {
      return code;
    }
    
    if (copied) {
2086
      pBlockScanInfo->lastKey = tsLastBlock;
2087 2088
      return TSDB_CODE_SUCCESS;
    } else {
H
Hongze Cheng 已提交
2089
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2090 2091 2092
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2093

2094
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2095
      tsdbRowMerge(&merge, &fRow1);
2096
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
2097

H
Hongze Cheng 已提交
2098
      code = tsdbRowMergerGetRow(&merge, &pTSRow);
2099 2100 2101
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2102

D
dapan1121 已提交
2103
      code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2104 2105

      taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2106
      tsdbRowMergerClear(&merge);
D
dapan1121 已提交
2107 2108 2109 2110 2111

      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2112 2113
    }
  } else {  // not merge block data
H
Hongze Cheng 已提交
2114
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2115 2116 2117 2118
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2119
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
2120 2121

    // merge with block data if ts == key
H
Haojun Liao 已提交
2122
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
2123 2124 2125
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Hongze Cheng 已提交
2126
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2127 2128 2129 2130
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

D
dapan1121 已提交
2131
    code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2132 2133

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2134
    tsdbRowMergerClear(&merge);
D
dapan1121 已提交
2135 2136 2137 2138

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
2139
  }
2140 2141 2142 2143

  return TSDB_CODE_SUCCESS;
}

2144 2145
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
2146 2147
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2148
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
2149
    // no last block available, only data block exists
2150
    if (!hasDataInLastBlock(pLastBlockReader)) {
2151 2152 2153 2154 2155 2156 2157 2158 2159
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
2160
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
2161 2162
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
H
Hongze Cheng 已提交
2163
        SRow*      pTSRow = NULL;
2164
        SRowMerger merge = {0};
2165

H
Hongze Cheng 已提交
2166
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2167 2168 2169 2170
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2171
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2172 2173

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2174
        tsdbRowMerge(&merge, &fRow1);
2175

2176
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
2177

H
Hongze Cheng 已提交
2178
        code = tsdbRowMergerGetRow(&merge, &pTSRow);
2179 2180 2181 2182
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

D
dapan1121 已提交
2183
        code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2184

2185
        taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2186
        tsdbRowMergerClear(&merge);
2187
        return code;
2188
      } else {
2189
        return TSDB_CODE_SUCCESS;
2190
      }
2191
    } else {  // desc order
2192
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
2193
    }
2194
  } else {  // only last block exists
2195
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2196
  }
2197 2198
}

2199 2200
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2201
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
2202
  SRow*               pTSRow = NULL;
H
Haojun Liao 已提交
2203
  int32_t             code = TSDB_CODE_SUCCESS;
2204 2205 2206
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2207 2208
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2209

2210
  int64_t tsLast = INT64_MIN;
2211 2212 2213
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2214

H
Hongze Cheng 已提交
2215
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2216 2217 2218 2219

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2220
  int64_t minKey = 0;
2221 2222 2223 2224 2225
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2226

2227 2228 2229
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2230

2231
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2232 2233
      minKey = key;
    }
2234

2235
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2236 2237 2238
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2239
    minKey = INT64_MIN;  // let find the maximum ts value
2240 2241 2242 2243 2244 2245 2246 2247
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2248
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2249 2250 2251
      minKey = key;
    }

2252
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2253 2254
      minKey = tsLast;
    }
2255 2256 2257 2258
  }

  bool init = false;

2259 2260 2261 2262
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2263
      init = true;
2264
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Hongze Cheng 已提交
2265
      code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2266 2267 2268 2269
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2270
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2271 2272
    }

2273
    if (minKey == tsLast) {
2274
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2275
      if (init) {
H
Hongze Cheng 已提交
2276
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2277
      } else {
2278
        init = true;
H
Hongze Cheng 已提交
2279
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2280 2281 2282
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2283
      }
H
Haojun Liao 已提交
2284

2285
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2286 2287 2288
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2289
      if (init) {
H
Hongze Cheng 已提交
2290
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2291
      } else {
2292 2293
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2294 2295 2296 2297
        if (pSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2298
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2299 2300 2301
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2302
      }
H
Haojun Liao 已提交
2303

2304 2305
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2306 2307
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2308
      }
2309 2310
    }

2311
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2312
      if (init) {
2313 2314 2315 2316
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2317
        tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
2318
      } else {
2319
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2320
        code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2321 2322 2323
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2324
      }
H
Haojun Liao 已提交
2325 2326 2327 2328
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2329 2330 2331 2332 2333
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2334
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2335
      code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2336 2337 2338 2339
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2340 2341 2342 2343 2344
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2345 2346 2347
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2348
      if (init) {
H
Hongze Cheng 已提交
2349
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2350
      } else {
2351 2352
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2353
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2354 2355 2356
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2357
      }
H
Haojun Liao 已提交
2358 2359 2360 2361
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2362 2363 2364 2365
      }
    }

    if (minKey == tsLast) {
2366
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2367
      if (init) {
H
Hongze Cheng 已提交
2368
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2369
      } else {
2370
        init = true;
H
Hongze Cheng 已提交
2371
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2372 2373 2374
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2375
      }
2376
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2377 2378 2379
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2380
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2381
      if (!init) {
H
Hongze Cheng 已提交
2382
        code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2383 2384 2385
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2386
      } else {
2387 2388 2389
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Hongze Cheng 已提交
2390
        tsdbRowMerge(&merge, &fRow);
2391 2392
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2393 2394 2395
    }
  }

2396
  if (merge.pTSchema == NULL) {
2397 2398 2399
    return code;
  }

H
Hongze Cheng 已提交
2400
  code = tsdbRowMergerGetRow(&merge, &pTSRow);
2401 2402 2403 2404
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

D
dapan1121 已提交
2405
  code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2406 2407

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2408
  tsdbRowMergerClear(&merge);
2409
  return code;
2410 2411
}

2412 2413 2414 2415 2416 2417 2418 2419 2420
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
2421 2422
    // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
2423
  } else {
2424 2425
    // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
2426 2427 2428
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
D
dapan1121 已提交
2429
  int64_t st = 0;
2430 2431 2432 2433 2434 2435 2436 2437 2438

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2439
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2440
                  "-%" PRId64 " %s",
2441 2442
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2443
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2444 2445 2446 2447 2448
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2449
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2450 2451 2452 2453 2454 2455 2456 2457 2458 2459
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2460
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2461
                  "-%" PRId64 " %s",
2462 2463
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2464
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2465 2466 2467 2468 2469
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2470
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2471 2472
  }

2473
  st = taosGetTimestampUs();
2474
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
2475
  pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
2476 2477 2478 2479 2480

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2481 2482
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2483 2484 2485 2486 2487 2488 2489 2490
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2502
  TSDBKEY k = {.ts = ts, .version = ver};
2503 2504
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2505 2506 2507
    return false;
  }

2508 2509 2510
  return true;
}

2511
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2512
  // the last block reader has been initialized for this table.
2513
  if (pLBlockReader->uid == pScanInfo->uid) {
2514
    return hasDataInLastBlock(pLBlockReader);
2515 2516
  }

2517 2518
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2519 2520
  }

2521 2522
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2523

2524 2525
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
2526
    w.skey = pScanInfo->lastKeyInStt;
2527
  } else {
2528
    w.ekey = pScanInfo->lastKeyInStt;
2529 2530
  }

X
Xiaoyu Wang 已提交
2531 2532
  tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
            pScanInfo->uid, pReader->idStr);
2533 2534
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
2535
                                pLBlockReader->pInfo, false, pReader->idStr, false);
2536 2537 2538 2539
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2540
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2541 2542
}

2543
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2544
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2545
  return TSDBROW_TS(&row);
2546 2547
}

H
Hongze Cheng 已提交
2548
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2549

2550
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2551
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
2552
    return false;  // this is an invalid result.
2553
  }
2554
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2555
}
2556

2557 2558
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2559
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
D
dapan1121 已提交
2560 2561 2562 2563 2564 2565 2566
  bool copied = false;
  int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
  if (code) {
    return code;
  }
  
  if (copied) {
2567
    pBlockScanInfo->lastKey = key;
2568 2569
    return TSDB_CODE_SUCCESS;
  } else {
C
Cary Xu 已提交
2570 2571
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

H
Hongze Cheng 已提交
2572
    SRow*      pTSRow = NULL;
2573 2574
    SRowMerger merge = {0};

H
Hongze Cheng 已提交
2575
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2576 2577 2578 2579
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2580
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Hongze Cheng 已提交
2581
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2582 2583 2584 2585
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

D
dapan1121 已提交
2586
    code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2587 2588

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2589
    tsdbRowMergerClear(&merge);
D
dapan1121 已提交
2590
    return code;
2591 2592 2593
  }
}

H
Haojun Liao 已提交
2594 2595
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2596 2597
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2598
  TSDBROW *pRow = NULL, *piRow = NULL;
2599
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2600 2601 2602
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2603

2604 2605 2606
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2607

2608 2609 2610 2611
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2612

2613 2614 2615 2616
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2617

2618 2619 2620
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2621
  }
2622 2623 2624

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2625 2626
}

H
Haojun Liao 已提交
2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2680
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2681 2682
  int32_t code = TSDB_CODE_SUCCESS;

2683 2684
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2685
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
C
Cary Xu 已提交
2686 2687
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2688
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
2689
  int64_t st = taosGetTimestampUs();
2690
  int32_t step = asc ? 1 : -1;
2691
  double  el = 0;
2692 2693 2694

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
2695 2696
    pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
    if (pBlockScanInfo == NULL) {
H
Haojun Liao 已提交
2697 2698 2699
      goto _end;
    }

C
Cary Xu 已提交
2700
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2701
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
C
Cary Xu 已提交
2702 2703

    // it is a clean block, load it directly
H
Hongze Cheng 已提交
2704
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
2705
        pBlock->nRow <= pReader->capacity) {
2706
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
D
dapan1121 已提交
2707 2708 2709 2710
        code = copyBlockDataToSDataBlock(pReader);
        if (code) {
          goto _end;
        }
2711 2712

        // record the last key value
H
Hongze Cheng 已提交
2713
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2714 2715
        goto _end;
      }
C
Cary Xu 已提交
2716 2717
    }
  } else {  // file blocks not exist
2718
    pBlockScanInfo = *pReader->status.pTableIter;
2719 2720
  }

2721
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2722
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2723

2724
  while (1) {
2725
    bool hasBlockData = false;
2726
    {
2727 2728
      while (pBlockData->nRow > 0 &&
             pBlockData->uid == pBlockScanInfo->uid) {  // find the first qualified row in data block
2729 2730 2731 2732 2733
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2734 2735
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2736
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2737
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2738
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2739

H
Haojun Liao 已提交
2740 2741 2742 2743 2744
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2745 2746
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2747
          }
2748 2749
        }
      }
2750
    }
2751

2752
    // no data in last block and block, no need to proceed.
2753
    if (hasBlockData == false) {
2754
      break;
2755 2756
    }

D
dapan1121 已提交
2757 2758 2759 2760 2761
    code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
    if (code) {
      goto _end;
    }
    
2762
    // currently loaded file data block is consumed
2763
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2764
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2765
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2766 2767 2768 2769 2770
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2771 2772 2773
    }
  }

H
Hongze Cheng 已提交
2774
_end:
2775 2776
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2777

2778 2779
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
D
dapan1121 已提交
2780
              " rows:%" PRId64 ", elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2781
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2782
              pResBlock->info.rows, el, pReader->idStr);
2783
  }
2784

H
Haojun Liao 已提交
2785
  return code;
2786 2787 2788 2789
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2790 2791 2792 2793 2794 2795 2796 2797
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2798 2799
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2800 2801 2802
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2803

2804
  int32_t code = 0;
2805 2806
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2807
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2808
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2809
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2810
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2811

H
Haojun Liao 已提交
2812
    if (pIdx != NULL) {
2813
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2814 2815 2816
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2817
    }
2818
  }
2819

2820 2821 2822 2823 2824 2825 2826
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2827 2828
  }

2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2843 2844 2845 2846 2847 2848 2849
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2850 2851
  return code;

2852 2853 2854
_err:
  taosArrayDestroy(pDelData);
  return code;
2855 2856
}

C
Cary Xu 已提交
2857
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2858
  bool asc = ASCENDING_TRAVERSE(pReader->order);
X
Xiaoyu Wang 已提交
2859
  //  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
2860

X
Xiaoyu Wang 已提交
2861
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
2862

X
Xiaoyu Wang 已提交
2863
  bool     hasKey = false, hasIKey = false;
2864
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2865
  if (pRow != NULL) {
2866
    hasKey = true;
2867 2868 2869
    key = TSDBROW_KEY(pRow);
  }

2870 2871 2872 2873
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
2874 2875
  }

2876
  if (hasKey) {
X
Xiaoyu Wang 已提交
2877
    if (hasIKey) {  // has data in mem & imem
2878 2879
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
X
Xiaoyu Wang 已提交
2880 2881
      } else {
        return key.ts <= ikey.ts ? ikey : key;
2882 2883 2884
      }
    } else {  // no data in imem
      return key;
2885
    }
2886 2887 2888 2889
  } else {
    // no data in mem & imem, return the initial value
    // only imem has data, return ikey
    return ikey;
2890 2891 2892
  }
}

2893
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2894
  SReaderStatus* pStatus = &pReader->status;
2895
  pBlockNum->numOfBlocks = 0;
2896
  pBlockNum->numOfLastFiles = 0;
2897

2898
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2899
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2900 2901

  while (1) {
dengyihao's avatar
dengyihao 已提交
2902
    bool    hasNext = false;
D
dapan1121 已提交
2903 2904 2905 2906 2907
    int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
    if (code) {
      taosArrayDestroy(pIndexList);
      return code;
    }
dengyihao's avatar
dengyihao 已提交
2908

2909
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2910 2911 2912
      break;
    }

H
Haojun Liao 已提交
2913
    taosArrayClear(pIndexList);
D
dapan1121 已提交
2914
    code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
H
Haojun Liao 已提交
2915
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2916
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2917 2918 2919
      return code;
    }

H
Hongze Cheng 已提交
2920
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2921
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2922
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2923
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2924 2925 2926
        return code;
      }

2927
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2928 2929 2930
        break;
      }
    }
2931

H
Haojun Liao 已提交
2932 2933 2934
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2935
  taosArrayDestroy(pIndexList);
2936

H
Haojun Liao 已提交
2937 2938 2939 2940 2941 2942 2943
  if (pReader->pReadSnap != NULL) {
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2944

H
Haojun Liao 已提交
2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2956 2957 2958
    }
  }

H
Haojun Liao 已提交
2959 2960 2961
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2962
static void resetTableListIndex(SReaderStatus* pStatus) {
2963
  STableUidList* pList = &pStatus->uidList;
2964

H
Haojun Liao 已提交
2965 2966 2967
  pList->currentIndex = 0;
  uint64_t uid = pList->tableUidList[0];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2968 2969
}

2970
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
2971 2972 2973 2974 2975 2976 2977 2978
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2979
  return (pStatus->pTableIter != NULL);
2980 2981
}

2982
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2983
  SReaderStatus*    pStatus = &pReader->status;
2984
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
2985
  STableUidList*    pUidList = &pStatus->uidList;
D
dapan1121 已提交
2986
  int32_t           code = TSDB_CODE_SUCCESS;
2987

H
Haojun Liao 已提交
2988 2989
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
    return TSDB_CODE_SUCCESS;
2990
  }
2991

2992 2993
  SSDataBlock* pResBlock = pReader->pResBlock;

2994
  while (1) {
2995
    // load the last data block of current table
H
Hongze Cheng 已提交
2996
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2997 2998

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2999
    if (!hasVal) {
3000
      bool hasNexTable = moveToNextTable(pUidList, pStatus);
3001
      if (!hasNexTable) {
3002 3003
        return TSDB_CODE_SUCCESS;
      }
3004

3005
      continue;
3006 3007
    }

3008 3009 3010 3011 3012 3013 3014 3015 3016
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

D
dapan1121 已提交
3017 3018 3019 3020 3021
      code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (code) {
        return code;
      }
      
3022 3023 3024
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
3025 3026
    }

3027 3028 3029 3030 3031
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
D
dapan1121 已提交
3032
                " rows:%" PRId64 ", elapsed time:%.2f ms %s",
3033 3034
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
3035 3036
      return TSDB_CODE_SUCCESS;
    }
3037

3038
    // current table is exhausted, let's try next table
3039
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
3040
    if (!hasNexTable) {
3041 3042
      return TSDB_CODE_SUCCESS;
    }
3043 3044 3045
  }
}

3046
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
3047 3048
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
3049 3050 3051

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
3052 3053 3054
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
3055

H
Haojun Liao 已提交
3056
  pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
3057
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
3058
    return terrno;
H
Haojun Liao 已提交
3059 3060
  }

3061
  pBlock = getCurrentBlock(pBlockIter);
3062

3063
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
C
Cary Xu 已提交
3064
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
3065

3066
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
3067
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
3068 3069
    if (code != TSDB_CODE_SUCCESS) {
      return code;
3070 3071 3072
    }

    // build composed data block
3073
    code = buildComposedDataBlock(pReader);
C
Cary Xu 已提交
3074
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
3075
    // data in memory that are earlier than current file block
3076
    // rows in buffer should be less than the file block in asc, greater than file block in desc
3077
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
3078
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
3079 3080 3081 3082
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
3083
      ASSERT(tsLast >= pBlock->maxKey.ts);
3084

3085 3086 3087 3088
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
3089
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

D
dapan1121 已提交
3101 3102 3103 3104 3105
        code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (code) {
          return code;
        }
        
3106 3107 3108 3109 3110 3111 3112 3113 3114 3115
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
D
dapan1121 已提交
3116
                  " rows:%" PRId64 ", elapsed time:%.2f ms %s",
3117 3118 3119
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
3120
    } else {  // whole block is required, return it directly
3121 3122
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
3123
      pInfo->id.uid = pScanInfo->uid;
3124
      pInfo->dataLoad = 0;
3125 3126 3127
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
3128

3129
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
3130
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
X
Xiaoyu Wang 已提交
3131 3132
      tsdbDebug("%p uid:%" PRIu64
                " clean file block retrieved from file, global index:%d, "
H
Haojun Liao 已提交
3133 3134 3135
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
3136
    }
3137 3138 3139 3140 3141
  }

  return code;
}

D
dapan1121 已提交
3142

D
dapan1121 已提交
3143
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
D
dapan1121 已提交
3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168
  int64_t    st = taosGetTimestampUs();
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
    goto _end;
  }

  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
    return TSDB_CODE_SUCCESS;
  }

  SBlockIdx*     pBlockIdx = NULL;
  int32_t i = 0;
  for (int32_t i = 0; i < num; ++i) {
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
    if (pBlockIdx->suid != pReader->suid) {
      continue;
    }

    STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
D
dapan1121 已提交
3169
    if (p == NULL) {
D
dapan1121 已提交
3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189
      continue;
    }

    STableBlockScanInfo *pScanInfo = *p;
    tMapDataReset(&pScanInfo->mapData);
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);

    SDataBlk block = {0};
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
      pReader->rowsNum += block.nRow;
    }
  }

_end:
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
  return code;
}


D
dapan1121 已提交
3190
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
D
dapan1121 已提交
3191 3192 3193 3194 3195 3196 3197
  int32_t   code = TSDB_CODE_SUCCESS;
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
  SSttBlockLoadInfo*   pBlockLoadInfo = NULL;

  for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) {  // open all last file
    pBlockLoadInfo = &pLastBlockReader->pInfo[i];
    
D
dapan1121 已提交
3198 3199 3200
    code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
    if (code) {
      return code;
D
dapan1121 已提交
3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214
    }

    size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
    if (size >= 1) {
      SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
      SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
  
      // all identical
      if (pStart->suid == pEnd->suid) {
        if (pStart->suid != pReader->suid) {
          // no qualified stt block existed
          taosArrayClear(pBlockLoadInfo->aSttBlk);
          continue;
        }
D
dapan1121 已提交
3215 3216 3217 3218
        for (int32_t i = 0; i < size; ++i) {
          SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
          pReader->rowsNum += p->nRow;
        }
D
dapan1121 已提交
3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239
      } else {
        for (int32_t i = 0; i < size; ++i) {
          SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
          uint64_t s = p->suid;
          if (s < pReader->suid) {
            continue;
          }
  
          if (s == pReader->suid) {
            pReader->rowsNum += p->nRow;
          } else if (s > pReader->suid) {
            break;
          }
        }
      }
    }
  }

  return code;
}

D
dapan1121 已提交
3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
  int32_t   code = TSDB_CODE_SUCCESS;

  while (1) {
    bool    hasNext = false;
    int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
    if (code) {
      return code;
    }

    if (!hasNext) {  // no data files on disk
      break;
    }

    code = doSumFileBlockRows(pReader, pReader->pFileReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doSumSttBlockRows(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }    
  }

  pReader->status.loadFromFile = false;

  return code;
}

D
dapan1121 已提交
3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
  int32_t   code = TSDB_CODE_SUCCESS;
  int64_t   memNum = 0, imemNum = 0;
  if (pReader->pReadSnap->pMem != NULL) {
    tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
  }

  if (pReader->pReadSnap->pIMem != NULL) {
    tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
  }

  pReader->rowsNum += memNum + imemNum;

  return code;
}

D
dapan1121 已提交
3286

H
Haojun Liao 已提交
3287
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
3288
  SReaderStatus* pStatus = &pReader->status;
3289
  STableUidList* pUidList = &pStatus->uidList;
3290

3291
  while (1) {
X
Xiaoyu Wang 已提交
3292 3293 3294 3295 3296 3297
    //    if (pStatus->pTableIter == NULL) {
    //      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
    //      if (pStatus->pTableIter == NULL) {
    //        return TSDB_CODE_SUCCESS;
    //      }
    //    }
3298

3299 3300
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
3301

3302
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
3303
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
3304 3305 3306 3307
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3308
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
3309
      return TSDB_CODE_SUCCESS;
3310 3311
    }

3312 3313 3314
    // current table is exhausted, let's try next table
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
    if (!hasNexTable) {
H
Haojun Liao 已提交
3315
      return TSDB_CODE_SUCCESS;
3316 3317 3318 3319
    }
  }
}

3320
// set the correct start position in case of the first/last file block, according to the query time window
3321
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3322 3323 3324 3325 3326 3327 3328 3329
  int64_t             lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (pScanInfo) {
      lastKey = pScanInfo->lastKey;
    }
3330
  }
3331 3332 3333
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
3334 3335 3336

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3337
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3338
  pDumpInfo->lastKey = lastKey;
3339 3340
}

3341
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3342
  SBlockNumber num = {0};
X
Xiaoyu Wang 已提交
3343
  int32_t      code = moveToNextFile(pReader, &num);
3344 3345 3346 3347 3348
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3349
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3350 3351 3352 3353 3354
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3355 3356
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3357
  } else {  // no block data, only last block exists
3358
    tBlockDataReset(&pReader->status.fileBlockData);
3359
    resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3360
    resetTableListIndex(&pReader->status);
3361
  }
3362 3363

  // set the correct start position according to the query time window
3364
  initBlockDumpInfo(pReader, pBlockIter);
3365 3366 3367
  return code;
}

3368
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3369 3370
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3371 3372
}

3373
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3374
  int32_t code = TSDB_CODE_SUCCESS;
3375 3376
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3377 3378
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3379
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
3380
  _begin:
3381 3382 3383 3384 3385
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3386 3387 3388 3389
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3390
    // all data blocks are checked in this last block file, now let's try the next file
3391 3392 3393 3394 3395 3396 3397 3398
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3399
      // this file does not have data files, let's start check the last block file if exists
3400
      if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3401
        resetTableListIndex(&pReader->status);
3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3416
  while (1) {
3417 3418
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3419
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3420
      code = buildComposedDataBlock(pReader);
3421 3422 3423 3424
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3425
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3426 3427
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3428
        } else {
H
Haojun Liao 已提交
3429
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3430
            // data blocks in current file are exhausted, let's try the next file now
H
Haojun Liao 已提交
3431 3432 3433 3434 3435 3436
            SBlockData* pBlockData = &pReader->status.fileBlockData;
            if (pBlockData->uid != 0) {
              tBlockDataClear(pBlockData);
            }

            tBlockDataReset(pBlockData);
3437
            resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3438
            resetTableListIndex(&pReader->status);
3439 3440 3441
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3442

3443
            // error happens or all the data files are completely checked
D
dapan1121 已提交
3444
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
3445 3446
              return code;
            }
3447

3448 3449
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3450
              resetTableListIndex(&pReader->status);
3451 3452
              goto _begin;
            }
3453
          }
3454
        }
H
Haojun Liao 已提交
3455
      }
3456 3457

      code = doBuildDataBlock(pReader);
3458 3459
    }

3460 3461 3462 3463 3464 3465 3466 3467
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3468
}
H
refact  
Hongze Cheng 已提交
3469

3470 3471
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3472
  if (VND_IS_RSMA(pVnode)) {
3473
    int8_t  level = 0;
3474 3475
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3476 3477 3478
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3479

3480
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3481 3482 3483 3484 3485 3486 3487
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3488
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3489 3490 3491 3492 3493
        break;
      }
      ++level;
    }

3494
    const char* str = (idStr != NULL) ? idStr : "";
3495 3496

    if (level == TSDB_RETENTION_L0) {
3497
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3498
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3499 3500
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3501
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3502
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3503 3504
      return VND_RSMA1(pVnode);
    } else {
3505
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3506
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3507 3508 3509 3510 3511 3512 3513
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3514
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3515
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3516 3517

  int64_t endVer = 0;
L
Liu Jicong 已提交
3518 3519
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3520 3521
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3522
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3523 3524
  }

H
Haojun Liao 已提交
3525
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3526 3527
}

3528
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3529 3530 3531
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3532

L
Liu Jicong 已提交
3533 3534 3535
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3536

3537 3538 3539 3540 3541 3542
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3543
        return false;
3544 3545
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
3546 3547
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
                prev->version >= pVerRange->minVer);
3548 3549
      }
    } else {
3550 3551 3552 3553 3554 3555 3556
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3557 3558
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3574 3575
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3576 3577 3578 3579 3580 3581
            return true;
          }
        }
      }

      return false;
3582 3583
    }
  } else {
3584 3585
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3586

3587 3588 3589 3590 3591 3592 3593
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3594
    } else {
3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3622 3623 3624 3625 3626
      }

      return false;
    }
  }
3627 3628

  return false;
3629 3630
}

3631
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3632
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3633 3634
    return NULL;
  }
H
Hongze Cheng 已提交
3635

3636
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3637 3638
  TSDBKEY  key = TSDBROW_KEY(pRow);

3639
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3640
    pIter->hasVal = false;
H
Haojun Liao 已提交
3641 3642
    return NULL;
  }
H
Hongze Cheng 已提交
3643

3644
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3645
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3646
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3647 3648
    return pRow;
  }
H
Hongze Cheng 已提交
3649

3650
  while (1) {
3651 3652
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3653 3654
      return NULL;
    }
H
Hongze Cheng 已提交
3655

3656
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3657

H
Haojun Liao 已提交
3658
    key = TSDBROW_KEY(pRow);
3659
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3660
      pIter->hasVal = false;
H
Haojun Liao 已提交
3661 3662
      return NULL;
    }
H
Hongze Cheng 已提交
3663

dengyihao's avatar
dengyihao 已提交
3664
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3665
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3666 3667 3668 3669
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3670

3671 3672
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3673
  while (1) {
3674 3675
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3676 3677
      break;
    }
H
Hongze Cheng 已提交
3678

3679
    // data exists but not valid
3680
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3681 3682 3683 3684 3685
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3686
    TSDBKEY k = TSDBROW_KEY(pRow);
3687
    if (k.ts != ts) {
H
Haojun Liao 已提交
3688 3689 3690
      break;
    }

3691 3692 3693 3694 3695
    if (pRow->type == TSDBROW_ROW_FMT) {
      STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
      if (pTSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
3696

3697 3698 3699 3700
      tsdbRowMergerAdd(pMerger, pRow, pTSchema);
    } else {  // column format
      tsdbRowMerge(pMerger, pRow);
    }
H
Haojun Liao 已提交
3701 3702 3703 3704 3705
  }

  return TSDB_CODE_SUCCESS;
}

3706
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3707
                                          SVersionRange* pVerRange, int32_t step) {
3708
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3709
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3710
      rowIndex += step;
3711 3712 3713 3714
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
H
Hongze Cheng 已提交
3715
    tsdbRowMerge(pMerger, &fRow);
3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3727
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3728 3729
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3730
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3731
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3732
  bool                asc = ASCENDING_TRAVERSE(pReader->order);
3733

3734
  *state = CHECK_FILEBLOCK_QUIT;
3735
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3736

3737
  bool    loadNeighbor = true;
H
Haojun Liao 已提交
3738
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3739

H
Haojun Liao 已提交
3740
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3741 3742
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
3743
    if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
3744 3745 3746 3747
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3748
  return code;
3749 3750
}

3751 3752
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3753 3754
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3755
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3756
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3757
  int32_t step = asc ? 1 : -1;
3758

3759
  pDumpInfo->rowIndex += step;
3760
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3761 3762 3763
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3764

3765 3766 3767 3768
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3769

3770
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3771
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
H
Haojun Liao 已提交
3772 3773 3774 3775 3776
      if (pFileBlockInfo == NULL) {
        st = CHECK_FILEBLOCK_QUIT;
        break;
      }

3777 3778 3779
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3780
      }
3781
    }
H
Haojun Liao 已提交
3782
  }
3783

H
Haojun Liao 已提交
3784 3785 3786
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3787
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3788
                               SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
3789
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3790 3791
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3792
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
3793
      tsdbRowMerge(pMerger, &fRow1);
3794
    } else {
3795 3796 3797
      tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
                pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
                idStr);
3798 3799 3800 3801 3802 3803 3804
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3805
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
3806
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3807
  TSDBROW* pNextRow = NULL;
3808
  TSDBROW  current = *pRow;
3809

3810 3811
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3812

3813
    if (!pIter->hasVal) {
3814
      *pResRow = *pRow;
3815
      *freeTSRow = false;
3816
      return TSDB_CODE_SUCCESS;
3817
    } else {  // has next point in mem/imem
3818
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3819
      if (pNextRow == NULL) {
H
Haojun Liao 已提交
3820
        *pResRow = current;
3821
        *freeTSRow = false;
3822
        return TSDB_CODE_SUCCESS;
3823 3824
      }

H
Hongze Cheng 已提交
3825
      if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
H
Haojun Liao 已提交
3826
        *pResRow = current;
3827
        *freeTSRow = false;
3828
        return TSDB_CODE_SUCCESS;
3829
      }
3830
    }
3831 3832
  }

3833
  SRowMerger merge = {0};
H
Haojun Liao 已提交
3834
  terrno = 0;
3835
  int32_t code = 0;
H
Haojun Liao 已提交
3836

3837 3838 3839 3840 3841 3842 3843
  // start to merge duplicated rows
  if (current.type == TSDBROW_ROW_FMT) {
    // get the correct schema for data in memory
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
    if (pTSchema == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3844

3845 3846 3847
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
H
Haojun Liao 已提交
3848

3849
    code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
3850 3851 3852
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3853

3854 3855 3856 3857
    STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
    if (pTSchema1 == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3858

3859
    tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
3860 3861
  } else {  // let's merge rows in file block
    code = tsdbRowMergerInit(&merge, &current, pReader->pSchema);
3862 3863 3864
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3865

3866 3867
    tsdbRowMerge(&merge, pNextRow);
  }
H
Haojun Liao 已提交
3868

wmmhello's avatar
wmmhello 已提交
3869
  code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
H
Haojun Liao 已提交
3870 3871 3872 3873
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3874
  code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
3875 3876 3877
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3878

wmmhello's avatar
wmmhello 已提交
3879
  pResRow->type = TSDBROW_ROW_FMT;
3880
  tsdbRowMergerClear(&merge);
3881
  *freeTSRow = true;
3882

3883
  return TSDB_CODE_SUCCESS;
3884 3885
}

3886
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3887
                           SRow** pTSRow) {
H
Haojun Liao 已提交
3888 3889
  SRowMerger merge = {0};

3890 3891 3892
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3893
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3894
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3895
    STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
3896

3897
    int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema);
H
Haojun Liao 已提交
3898 3899 3900 3901 3902 3903 3904 3905 3906
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3907

H
Haojun Liao 已提交
3908 3909
    pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
    tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
3910 3911 3912 3913 3914 3915
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3916
  } else {
H
Haojun Liao 已提交
3917
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3918

H
Hongze Cheng 已提交
3919
    int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
3920
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3921 3922 3923 3924 3925 3926 3927 3928
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3929

H
Hongze Cheng 已提交
3930
    tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3931 3932 3933 3934 3935
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3936
  }
3937

H
Haojun Liao 已提交
3938 3939
  int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
  tsdbRowMergerClear(&merge);
3940
  return code;
3941 3942
}

3943
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
3944
                            bool* freeTSRow) {
3945 3946
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3947
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3948
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3949

3950 3951
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3952
  if (pBlockScanInfo->iter.hasVal) {
3953 3954 3955 3956 3957 3958
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3959
  if (pBlockScanInfo->iiter.hasVal) {
3960 3961 3962 3963 3964 3965
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3966
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3967
    TSDBKEY k = TSDBROW_KEY(pRow);
3968
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3969

3970
    int32_t code = TSDB_CODE_SUCCESS;
3971 3972
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3973
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
3974
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3975
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
3976
      }
3977
    } else {  // ik.ts == k.ts
3978
      *freeTSRow = true;
3979 3980
      pResRow->type = TSDBROW_ROW_FMT;
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
3981 3982 3983
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3984
    }
3985

3986
    return code;
H
Haojun Liao 已提交
3987 3988
  }

3989
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3990
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
H
Hongze Cheng 已提交
3991
                                    freeTSRow);
H
Haojun Liao 已提交
3992 3993
  }

3994
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3995
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3996 3997 3998 3999 4000
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
4001
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
4002
  int32_t outputRowIndex = pBlock->info.rows;
4003
  int64_t uid = pScanInfo->uid;
D
dapan1121 已提交
4004
  int32_t code =  TSDB_CODE_SUCCESS;
4005 4006 4007

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

4008
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
4009
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
4010

4011
  SColVal colVal = {0};
4012
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
4013

4014
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
4015
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
4016
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
4017 4018 4019
    i += 1;
  }

H
Haojun Liao 已提交
4020
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
4021
    col_id_t colId = pSupInfo->colId[i];
4022 4023

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
4024
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
4025

H
Hongze Cheng 已提交
4026
      tRowGet(pTSRow, pSchema, j, &colVal);
D
dapan1121 已提交
4027 4028 4029 4030
      code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
      if (code) {
        return code;
      }
4031 4032 4033
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
4034
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
4035

4036
      colDataSetNULL(pColInfoData, outputRowIndex);
4037 4038 4039
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
4040
    }
4041 4042
  }

4043
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
4044
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
4045
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
4046
    colDataSetNULL(pColInfoData, outputRowIndex);
4047 4048 4049
    i += 1;
  }

4050
  pBlock->info.dataLoad = 1;
4051
  pBlock->info.rows += 1;
4052
  pScanInfo->lastKey = pTSRow->ts;
4053 4054 4055
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
4056 4057
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
4058 4059
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;
D
dapan1121 已提交
4060
  int32_t code = TSDB_CODE_SUCCESS;
4061 4062

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
4063
  if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
4064
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
4065
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
4066
    i += 1;
4067 4068 4069
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
4070
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
4071
  int32_t numOfOutputCols = pSupInfo->numOfCols;
4072

4073
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
4074
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
4075
    if (pData->cid < pSupInfo->colId[i]) {
4076 4077 4078 4079
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
4080 4081
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
4082
      tColDataGetValue(pData, rowIndex, &cv);
D
dapan1121 已提交
4083 4084 4085 4086
      code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
      if (code) {
        return code;
      }
4087
      j += 1;
H
Haojun Liao 已提交
4088 4089
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
4090
      colDataSetNULL(pCol, outputRowIndex);
4091 4092 4093 4094 4095 4096
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
4097
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
4098
    colDataSetNULL(pCol, outputRowIndex);
4099 4100 4101
    i += 1;
  }

4102
  pResBlock->info.dataLoad = 1;
4103 4104 4105 4106
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

4107 4108
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
4109
  SSDataBlock* pBlock = pReader->pResBlock;
D
dapan1121 已提交
4110
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
4111 4112

  do {
4113
    //    SRow* pTSRow = NULL;
4114
    TSDBROW row = {.type = -1};
4115
    bool    freeTSRow = false;
4116 4117
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
    if (row.type == -1) {
4118
      break;
H
Haojun Liao 已提交
4119 4120
    }

4121
    if (row.type == TSDBROW_ROW_FMT) {
D
dapan1121 已提交
4122
      code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
4123

4124 4125 4126
      if (freeTSRow) {
        taosMemoryFree(row.pTSRow);
      }
D
dapan1121 已提交
4127 4128 4129 4130

      if (code) {
        return code;
      }
4131
    } else {
D
dapan1121 已提交
4132 4133 4134 4135
      code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
      if (code) {
        break;
      }
4136
    }
H
Haojun Liao 已提交
4137 4138

    // no data in buffer, return immediately
4139
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
4140 4141 4142
      break;
    }

4143
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
4144 4145 4146 4147
      break;
    }
  } while (1);

D
dapan1121 已提交
4148
  return code;
H
Haojun Liao 已提交
4149
}
H
Hongze Cheng 已提交
4150

4151 4152
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
4153
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
4154

4155
  STableBlockScanInfo** p = NULL;
4156
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
4157
    clearBlockScanInfo(*p);
4158 4159
  }

D
dapan1121 已提交
4160 4161 4162 4163 4164
  if (size < num) {
    int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
    if (code) {
      return code;
    }
4165 4166 4167 4168 4169 4170 4171

    char* p1 = taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
    if (p1 == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    pReader->status.uidList.tableUidList = (uint64_t*)p1;
D
dapan1121 已提交
4172
  }
4173

4174
  taosHashClear(pReader->status.pTableMap);
4175
  STableUidList* pUidList = &pReader->status.uidList;
H
Haojun Liao 已提交
4176
  pUidList->currentIndex = 0;
4177

4178 4179
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
4180 4181
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
H
Haojun Liao 已提交
4182 4183
    pUidList->tableUidList[i] = pList[i].uid;

4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194
    // todo extract method
    if (ASCENDING_TRAVERSE(pReader->order)) {
      int64_t skey = pReader->window.skey;
      pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
      pInfo->lastKeyInStt = skey;
    } else {
      int64_t ekey = pReader->window.ekey;
      pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
      pInfo->lastKeyInStt = ekey;
    }

4195
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
4196 4197
  }

H
Hongze Cheng 已提交
4198 4199 4200
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4201 4202 4203 4204 4205 4206
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
4207

dengyihao's avatar
dengyihao 已提交
4208 4209 4210 4211 4212 4213
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
4214

H
Hongze Cheng 已提交
4215
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
4216

4217
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
4218 4219
  SReaderStatus*  pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
4220

4221 4222
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pStatus->blockIter, pReader->order);
4223

4224 4225 4226
  int32_t code = TSDB_CODE_SUCCESS;
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
D
dapan1121 已提交
4227 4228
  } else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
    // DO NOTHING
4229
  } else {
4230
    code = initForFirstBlockInFile(pReader, pBlockIter);
4231
  }
4232 4233 4234

  if (!pStatus->loadFromFile) {
    resetTableListIndex(pStatus);
4235
  }
4236 4237

  return code;
4238 4239
}

4240
static void freeSchemaFunc(void* param) {
4241
  void* p = *(void**)param;
4242 4243 4244
  taosMemoryFree(p);
}

H
refact  
Hongze Cheng 已提交
4245
// ====================================== EXPOSED APIs ======================================
4246
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
D
dapan1121 已提交
4247
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
4248 4249
  STimeWindow window = pCond->twindows;

4250 4251 4252
  int32_t capacity = pVnode->config.tsdbCfg.maxRows;
  if (pResBlock != NULL) {
    blockDataEnsureCapacity(pResBlock, capacity);
H
Haojun Liao 已提交
4253 4254 4255
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
4256
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
4257 4258
    goto _err;
  }
H
Hongze Cheng 已提交
4259

4260
  // check for query time window
H
Haojun Liao 已提交
4261
  STsdbReader* pReader = *ppReader;
4262
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
4263 4264 4265
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4266

4267 4268
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
4269
    int32_t order = pCond->order;
4270
    if (order == TSDB_ORDER_ASC) {
4271
      pCond->twindows.ekey = window.skey - 1;
4272 4273 4274
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
4275
      pCond->twindows.skey = window.ekey + 1;
4276 4277 4278 4279
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

4280
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
4281
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
4282 4283 4284 4285 4286
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
4287
      pCond->twindows.skey = window.ekey + 1;
4288
      pCond->twindows.ekey = INT64_MAX;
4289
    } else {
4290
      pCond->twindows.skey = INT64_MIN;
4291
      pCond->twindows.ekey = window.ekey - 1;
4292
    }
4293 4294
    pCond->order = order;

H
Haojun Liao 已提交
4295
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
4296 4297 4298 4299 4300
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
4301
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
4302 4303
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
4304
  if (pCond->suid != 0) {
4305
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
4306
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4307
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
4308
    }
4309 4310
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
4311
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
4312
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4313
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
4314
    }
4315 4316
  }

4317 4318
  pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
  if (pReader->pSchemaMap == NULL) {
4319
    tsdbError("failed init schema hash for reader %s", pReader->idStr);
4320 4321 4322 4323 4324
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
4325
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
4326 4327 4328 4329
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
4330
  }
4331

4332
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
X
Xiaoyu Wang 已提交
4333 4334
  pReader->status.pTableMap =
      createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
H
Haojun Liao 已提交
4335 4336
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
4337
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
4338 4339
    goto _err;
  }
H
Hongze Cheng 已提交
4340

4341
  pReader->suspended = true;
4342

D
dapan1121 已提交
4343 4344 4345
  if (countOnly) {
    pReader->readMode = READ_MODE_COUNT_ONLY;
  }
D
dapan1121 已提交
4346
  
4347
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
4348
  return code;
H
Hongze Cheng 已提交
4349 4350

_err:
H
Haojun Liao 已提交
4351
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
4352
  tsdbReaderClose(pReader);
X
Xiaoyu Wang 已提交
4353
  *ppReader = NULL;  // reset the pointer value.
H
Hongze Cheng 已提交
4354
  return code;
H
refact  
Hongze Cheng 已提交
4355 4356 4357
}

void tsdbReaderClose(STsdbReader* pReader) {
4358 4359
  if (pReader == NULL) {
    return;
4360
  }
H
refact  
Hongze Cheng 已提交
4361

4362
  tsdbAcquireReader(pReader);
4363
  {
H
Haojun Liao 已提交
4364
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
4365
      STsdbReader* p = pReader->innerReader[0];
4366

4367
      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4368
      p->status.uidList.tableUidList = NULL;
4369 4370
      p->pReadSnap = NULL;
      p->pSchema = NULL;
4371
      p->pSchemaMap = NULL;
4372 4373 4374 4375

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4376
      p->status.uidList.tableUidList = NULL;
4377 4378
      p->pReadSnap = NULL;
      p->pSchema = NULL;
4379
      p->pSchemaMap = NULL;
4380 4381 4382 4383 4384 4385

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

4386
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
4387

4388
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
4389
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
4390 4391 4392 4393
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
4394

H
Haojun Liao 已提交
4395 4396 4397
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
4398

H
Haojun Liao 已提交
4399
  taosMemoryFree(pSupInfo->colId);
H
Hongze Cheng 已提交
4400
  tBlockDataDestroy(&pReader->status.fileBlockData);
4401
  cleanupDataBlockIterator(&pReader->status.blockIter);
4402 4403

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
4404 4405 4406 4407
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }
4408

H
Haojun Liao 已提交
4409 4410 4411
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4412

4413 4414 4415 4416 4417 4418 4419 4420 4421
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

4422
  qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
4423
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
4424
  pReader->pReadSnap = NULL;
4425

4426 4427
  tsdbReleaseReader(pReader);

4428
  tsdbUninitReaderLock(pReader);
4429

4430
  taosMemoryFree(pReader->status.uidList.tableUidList);
H
Haojun Liao 已提交
4431
  SIOCostSummary* pCost = &pReader->cost;
4432

H
Haojun Liao 已提交
4433 4434
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4435 4436
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4437

H
Haojun Liao 已提交
4438
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
H
refact  
Hongze Cheng 已提交
4439

H
Haojun Liao 已提交
4440 4441 4442
    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4443

4444 4445 4446 4447 4448
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
X
Xiaoyu Wang 已提交
4449 4450
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
      "ms, %s",
4451 4452 4453
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
H
Haojun Liao 已提交
4454
      pCost->initDelSkylineIterTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4455

4456 4457
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4458

4459
  tSimpleHashCleanup(pReader->pSchemaMap);
4460
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4461 4462
}

4463 4464 4465 4466 4467 4468 4469 4470 4471 4472
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
  int32_t code = 0;

  // save reader's base state & reset top state to be reconstructed from base state
  SReaderStatus*       pStatus = &pReader->status;
  STableBlockScanInfo* pBlockScanInfo = NULL;

  if (pStatus->loadFromFile) {
    SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
    if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
4473
      pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
4474 4475 4476 4477
      if (pBlockScanInfo == NULL) {
        goto _err;
      }
    } else {
4478
      pBlockScanInfo = *pStatus->pTableIter;
4479 4480 4481 4482 4483
    }

    tsdbDataFReaderClose(&pReader->pFileReader);

    // resetDataBlockScanInfo excluding lastKey
4484
    STableBlockScanInfo** p = NULL;
4485 4486

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
4487 4488 4489 4490 4491 4492 4493 4494
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
4495 4496
      }

4497 4498 4499 4500 4501 4502
      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
4503 4504
    }
  } else {
4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518 4519 4520 4521 4522 4523 4524 4525 4526
    // resetDataBlockScanInfo excluding lastKey
    STableBlockScanInfo** p = NULL;

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
      }

      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
    }

4527
    pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
4528 4529 4530 4531 4532 4533 4534
    if (pBlockScanInfo) {
      // save lastKey to restore memory iterator
      STimeWindow w = pReader->pResBlock->info.window;
      pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;

      // reset current current table's data block scan info,
      pBlockScanInfo->iterInit = false;
4535 4536
      pBlockScanInfo->iter.hasVal = false;
      pBlockScanInfo->iiter.hasVal = false;
4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551
      if (pBlockScanInfo->iter.iter != NULL) {
        pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
      }

      if (pBlockScanInfo->iiter.iter != NULL) {
        pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
      }

      pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
      tMapDataClear(&pBlockScanInfo->mapData);
      // TODO: keep skyline for reuse
      pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
    }
  }

4552
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
4553
  pReader->pReadSnap = NULL;
4554 4555 4556

  pReader->suspended = true;

4557 4558
  tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
            pReader->idStr);
4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569
  return code;

_err:
  tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

static int32_t tsdbSetQueryReseek(void* pQHandle) {
  int32_t      code = 0;
  STsdbReader* pReader = pQHandle;

4570
  code = tsdbTryAcquireReader(pReader);
4571 4572
  if (code == 0) {
    if (pReader->suspended) {
4573
      tsdbReleaseReader(pReader);
4574 4575 4576 4577
      return code;
    }

    tsdbReaderSuspend(pReader);
4578

4579
    tsdbReleaseReader(pReader);
4580

4581
    return code;
4582 4583 4584
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
4585 4586
    terrno = TAOS_SYSTEM_ERROR(code);
    return TSDB_CODE_FAILED;
4587 4588 4589 4590 4591 4592
  }
}

int32_t tsdbReaderResume(STsdbReader* pReader) {
  int32_t code = 0;

4593
  STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
4594 4595 4596

  //  restore reader's state
  //  task snapshot
4597
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
4598
  if (numOfTables > 0) {
4599
    qTrace("tsdb/reader: %p, take snapshot", pReader);
4600 4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612 4613 4614 4615 4616
    code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    } else {
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];

      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4617
      pPrevReader->status.uidList = pReader->status.uidList;
4618
      pPrevReader->pSchema = pReader->pSchema;
4619
      pPrevReader->pSchemaMap = pReader->pSchemaMap;
4620 4621 4622 4623
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4624
      pNextReader->status.uidList = pReader->status.uidList;
4625
      pNextReader->pSchema = pReader->pSchema;
4626
      pNextReader->pSchemaMap = pReader->pSchemaMap;
4627 4628 4629 4630 4631 4632 4633 4634 4635 4636 4637
      pNextReader->pReadSnap = pReader->pReadSnap;

      code = doOpenReaderImpl(pPrevReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

  pReader->suspended = false;

4638 4639
  tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
            pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
4640 4641 4642 4643 4644 4645 4646
  return code;

_err:
  tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

D
dapan1121 已提交
4647 4648 4649 4650
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
  int32_t code =  TSDB_CODE_SUCCESS;
  SSDataBlock* pBlock = pReader->pResBlock;

D
dapan1121 已提交
4651 4652 4653
  if (pReader->status.loadFromFile == false) {
    return false;
  }
D
dapan1121 已提交
4654

D
dapan1121 已提交
4655
  code = readRowsCountFromFiles(pReader);
D
dapan1121 已提交
4656 4657
  if (code != TSDB_CODE_SUCCESS) {
    return false;
D
dapan1121 已提交
4658 4659
  }

D
dapan1121 已提交
4660 4661 4662 4663 4664
  code = readRowsCountFromMem(pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

D
dapan1121 已提交
4665 4666 4667 4668 4669 4670 4671 4672 4673
  pBlock->info.rows = pReader->rowsNum;
  pBlock->info.id.uid = 0;
  pBlock->info.dataLoad = 0;
  
  pReader->rowsNum = 0;
  
  return pBlock->info.rows > 0;
}

D
dapan1121 已提交
4674 4675 4676
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
  int32_t code = TSDB_CODE_SUCCESS;
  
H
Haojun Liao 已提交
4677
  // cleanup the data that belongs to the previous data block
4678 4679
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4680

D
dapan1121 已提交
4681 4682
  *hasNext = false;

4683
  SReaderStatus* pStatus = &pReader->status;
4684
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
D
dapan1121 已提交
4685
    return code;
4686
  }
H
Haojun Liao 已提交
4687

D
dapan1121 已提交
4688 4689 4690 4691
  if (READ_MODE_COUNT_ONLY == pReader->readMode) {
    return tsdbReadRowsCountOnly(pReader);
  }

4692
  if (pStatus->loadFromFile) {
D
dapan1121 已提交
4693
    code = buildBlockFromFiles(pReader);
4694
    if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4695
      return code;
4696
    }
4697

D
dapan1121 已提交
4698
    if (pBlock->info.rows <= 0) {
4699
      resetTableListIndex(&pReader->status);
D
dapan1121 已提交
4700
      code = buildBlockFromBufferSequentially(pReader);
H
Haojun Liao 已提交
4701
    }
4702
  } else {  // no data in files, let's try the buffer
D
dapan1121 已提交
4703
    code = buildBlockFromBufferSequentially(pReader);
H
Haojun Liao 已提交
4704
  }
D
dapan1121 已提交
4705

D
dapan1121 已提交
4706 4707
  *hasNext = pBlock->info.rows > 0;

D
dapan1121 已提交
4708
  return code;
H
refact  
Hongze Cheng 已提交
4709 4710
}

D
dapan1121 已提交
4711 4712 4713 4714 4715
int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
  int32_t code = TSDB_CODE_SUCCESS;

  *hasNext = false;
  
4716
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
D
dapan1121 已提交
4717
    return code;
4718 4719
  }

4720 4721
  SReaderStatus* pStatus = &pReader->status;

D
dapan1121 已提交
4722
  code = tsdbAcquireReader(pReader);
4723 4724
  qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

4725 4726 4727 4728
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4729
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
D
dapan1121 已提交
4730 4731 4732 4733 4734 4735
    code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext);
    if (code) {
      tsdbReleaseReader(pReader);
      return code;
    }
    
4736
    pReader->step = EXTERNAL_ROWS_PREV;
D
dapan1121 已提交
4737
    if (*hasNext) {
4738
      pStatus = &pReader->innerReader[0]->status;
4739
      if (pStatus->composedDataBlock) {
4740
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4741
        tsdbReleaseReader(pReader);
4742 4743
      }

D
dapan1121 已提交
4744
      return code;
4745
    }
4746
  }
4747

4748
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4749
    // prepare for the main scan
4750 4751 4752
    code = doOpenReaderImpl(pReader);
    int32_t step = 1;
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step);
4753 4754 4755 4756 4757

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4758
    pReader->step = EXTERNAL_ROWS_MAIN;
4759 4760
  }

D
dapan1121 已提交
4761 4762 4763 4764 4765 4766 4767
  code = doTsdbNextDataBlock(pReader, hasNext);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbReleaseReader(pReader);
    return code;
  }
  
  if (*hasNext) {
4768
    if (pStatus->composedDataBlock) {
4769
      qTrace("tsdb/read: %p, unlock read mutex", pReader);
4770
      tsdbReleaseReader(pReader);
4771 4772
    }

D
dapan1121 已提交
4773
    return code;
4774 4775
  }

4776
  if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
4777
    // prepare for the next row scan
4778 4779 4780
    int32_t step = -1;
    code = doOpenReaderImpl(pReader->innerReader[1]);
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step);
4781 4782 4783 4784
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

D
dapan1121 已提交
4785 4786 4787 4788 4789 4790
    code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext);
    if (code != TSDB_CODE_SUCCESS) {
      tsdbReleaseReader(pReader);
      return code;
    }
    
4791
    pReader->step = EXTERNAL_ROWS_NEXT;
D
dapan1121 已提交
4792
    if (*hasNext) {
4793
      pStatus = &pReader->innerReader[1]->status;
4794
      if (pStatus->composedDataBlock) {
4795
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4796
        tsdbReleaseReader(pReader);
4797 4798
      }

D
dapan1121 已提交
4799
      return code;
4800 4801 4802
    }
  }

4803
  qTrace("tsdb/read: %p, unlock read mutex", pReader);
4804
  tsdbReleaseReader(pReader);
4805

D
dapan1121 已提交
4806
  return code;
4807 4808
}

4809
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
4810 4811
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
4812
  int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
4813
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4814
  size++;
4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4826
        taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
4827
        i += 1;
D
dapan1121 已提交
4828
        size++;
4829 4830 4831 4832
      }
      j += 1;
    }
  }
4833 4834 4835 4836 4837 4838 4839 4840 4841

  while (j < numOfCols) {
    if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
      SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
      taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
      i += 1;
    }
    j++;
  }
4842 4843
}

4844
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4845 4846
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4847
  int32_t code = 0;
4848
  *allHave = false;
H
Haojun Liao 已提交
4849
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4850

4851
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4852 4853 4854
    return TSDB_CODE_SUCCESS;
  }

4855
  // there is no statistics data for composed block
4856
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4857 4858
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4859

4860
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4861 4862
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4863 4864 4865
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4866

D
dapan1121 已提交
4867 4868
  int64_t st = taosGetTimestampUs();

4869
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4870
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4871
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4872
    if (code != TSDB_CODE_SUCCESS) {
4873 4874
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4875 4876
      return code;
    }
4877
  } else {
H
Haojun Liao 已提交
4878
    *pBlockSMA = NULL;
4879
    return TSDB_CODE_SUCCESS;
4880
  }
H
Hongze Cheng 已提交
4881

4882
  *allHave = true;
H
Hongze Cheng 已提交
4883

4884 4885
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4886

4887 4888
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4889 4890 4891 4892
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4893
  size_t numOfCols = pSup->numOfCols;
4894

4895
  // ensure capacity
H
Haojun Liao 已提交
4896 4897 4898
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4899 4900
  }

4901 4902 4903
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
H
Haojun Liao 已提交
4904
    pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
4905
  }
4906

4907
  // do fill all null column value SMA info
4908
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4909
  size_t size = taosArrayGetSize(pSup->pColAgg);
4910

H
Haojun Liao 已提交
4911
  int32_t i = 0, j = 0;
4912
  while (j < numOfCols && i < size) {
4913
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4914 4915
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4916 4917
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4918
    } else if (pAgg->colId < pSup->colId[j]) {
4919
      i += 1;
H
Haojun Liao 已提交
4920
    } else if (pSup->colId[j] < pAgg->colId) {
4921 4922
      pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
      *allHave = false;
4923 4924 4925 4926
      j += 1;
    }
  }

H
Haojun Liao 已提交
4927
  *pBlockSMA = pResBlock->pBlockAgg;
4928
  pReader->cost.smaDataLoad += 1;
4929

D
dapan1121 已提交
4930 4931 4932
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
  pReader->cost.smaLoadTime += elapsedTime;

4933
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4934
  return code;
H
Hongze Cheng 已提交
4935 4936
}

H
Haojun Liao 已提交
4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
  STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
  if (p == NULL || *p == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    int32_t size = taosHashGetSize(pTableMap);
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
    return NULL;
  }

  return *p;
}

H
Haojun Liao 已提交
4949
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
4950
  SReaderStatus*       pStatus = &pReader->status;
D
dapan1121 已提交
4951
  int32_t              code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
4952
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Haojun Liao 已提交
4953
  STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
4954
  if (pBlockScanInfo == NULL) {
4955
    return NULL;
4956 4957
  }

D
dapan1121 已提交
4958 4959 4960 4961 4962 4963 4964 4965
  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
  if (code != TSDB_CODE_SUCCESS) {
    tBlockDataDestroy(&pStatus->fileBlockData);
    terrno = code;
    return NULL;
  }

  code = copyBlockDataToSDataBlock(pReader);
4966
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4967
    tBlockDataDestroy(&pStatus->fileBlockData);
4968 4969
    terrno = code;
    return NULL;
4970
  }
4971

H
Haojun Liao 已提交
4972
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4973 4974
}

H
Haojun Liao 已提交
4975
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4976
  STsdbReader* pTReader = pReader;
4977 4978
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
4979
      pTReader = pReader->innerReader[0];
4980
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
4981
      pTReader = pReader->innerReader[1];
4982 4983 4984
    }
  }

4985 4986 4987 4988 4989 4990 4991
  SReaderStatus* pStatus = &pTReader->status;
  if (pStatus->composedDataBlock) {
    return pTReader->pResBlock;
  }

  SSDataBlock* ret = doRetrieveDataBlock(pTReader);

4992
  qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
4993
  tsdbReleaseReader(pReader);
4994 4995

  return ret;
4996 4997
}

H
Haojun Liao 已提交
4998
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
4999
  qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
5000
  tsdbAcquireReader(pReader);
L
Liu Jicong 已提交
5001 5002 5003 5004 5005

  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

H
Haojun Liao 已提交
5006
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
5007
    tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr);
5008
    tsdbReleaseReader(pReader);
5009 5010
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
5011

5012
  SReaderStatus*  pStatus = &pReader->status;
H
Haojun Liao 已提交
5013
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
5014

L
Liu Jicong 已提交
5015
  pReader->order = pCond->order;
5016
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
H
Haojun Liao 已提交
5017 5018
  pStatus->loadFromFile = true;
  pStatus->pTableIter = NULL;
H
Haojun Liao 已提交
5019
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
5020

5021
  // allocate buffer in order to load data blocks from file
5022
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
5023

5024
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
5025
  tsdbDataFReaderClose(&pReader->pFileReader);
5026

H
Haojun Liao 已提交
5027
  int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
L
Liu Jicong 已提交
5028

H
Haojun Liao 已提交
5029
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
5030
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
5031
  resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
5032

5033 5034 5035
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
  int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
5036
  resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
5037

5038
  int32_t code = 0;
5039

5040
  // no data in files, let's try buffer in memory
H
Haojun Liao 已提交
5041 5042
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
5043
    resetTableListIndex(pStatus);
5044 5045 5046
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
5047 5048
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
5049

5050
      tsdbReleaseReader(pReader);
5051 5052 5053
      return code;
    }
  }
H
Hongze Cheng 已提交
5054

H
Hongze Cheng 已提交
5055 5056 5057 5058
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
5059

5060
  tsdbReleaseReader(pReader);
5061

5062
  return code;
H
Hongze Cheng 已提交
5063
}
H
Hongze Cheng 已提交
5064

5065 5066 5067 5068 5069 5070
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows, int32_t numOfBucket) {
  int32_t bucketIndex = (numOfRows - startRow) / bucketRange;
  if (bucketIndex == numOfBucket) {
    bucketIndex -= 1;
  }
  return bucketIndex;
5071
}
H
Hongze Cheng 已提交
5072

5073 5074 5075 5076
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
5077
  pTableBlockInfo->numOfVgroups = 1;
H
Hongze Cheng 已提交
5078

5079
  const int32_t numOfBucket = 20.0;
dengyihao's avatar
dengyihao 已提交
5080

5081
  // find the start data block in file
dengyihao's avatar
dengyihao 已提交
5082 5083 5084 5085
  tsdbAcquireReader(pReader);
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }
5086
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
5087

5088 5089 5090
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
5091

H
Haojun Liao 已提交
5092
  int32_t bucketRange = ceil(((double)(pc->maxRows - pc->minRows)) / numOfBucket);
H
Hongze Cheng 已提交
5093

5094
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
5095

5096 5097
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
5098

5099 5100
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
5101

5102 5103
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
5104
  }
H
Hongze Cheng 已提交
5105

5106
  pTableBlockInfo->numOfTables = numOfTables;
5107
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
5108

5109 5110
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
5111
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
5112

5113 5114
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
5115

5116 5117 5118
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
5119

5120 5121 5122
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
5123

5124 5125 5126
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
5127

5128
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
5129

5130
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBucket);
5131
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
5132

H
Haojun Liao 已提交
5133
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
5134 5135
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
H
Haojun Liao 已提交
5136
      if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
5137 5138
        break;
      }
H
refact  
Hongze Cheng 已提交
5139

5140 5141
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
5142
    }
H
refact  
Hongze Cheng 已提交
5143

H
Hongze Cheng 已提交
5144 5145
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
5146
  }
dengyihao's avatar
dengyihao 已提交
5147
  tsdbReleaseReader(pReader);
H
refact  
Hongze Cheng 已提交
5148 5149
  return code;
}
H
Hongze Cheng 已提交
5150

H
refact  
Hongze Cheng 已提交
5151
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
5152
  int64_t rows = 0;
H
Hongze Cheng 已提交
5153

5154
  SReaderStatus* pStatus = &pReader->status;
5155
  tsdbAcquireReader(pReader);
5156 5157 5158 5159
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

5160
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
5161

5162
  while (pStatus->pTableIter != NULL) {
5163
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
5164 5165

    STbData* d = NULL;
5166
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
5167
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
5168 5169 5170 5171 5172 5173
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
5174
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
5175
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
5176 5177 5178 5179 5180 5181 5182 5183
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
5184

5185
  tsdbReleaseReader(pReader);
5186

H
refact  
Hongze Cheng 已提交
5187
  return rows;
H
Hongze Cheng 已提交
5188
}
D
dapan1121 已提交
5189

L
Liu Jicong 已提交
5190
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
5191 5192 5193 5194
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
5195
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
5196 5197 5198 5199 5200 5201 5202
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
5203

D
dapan1121 已提交
5204
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
5205
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
5206
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
5207
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
5208 5209 5210 5211 5212 5213
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
5214
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
5215
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
5216 5217 5218 5219
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
5220 5221 5222
  }

  metaReaderClear(&mr);
5223
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
5224

D
dapan1121 已提交
5225 5226
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
5227

H
Hongze Cheng 已提交
5228
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
H
Hongze Cheng 已提交
5229 5230 5231
  int32_t        code = 0;
  STsdb*         pTsdb = pReader->pTsdb;
  SVersionRange* pRange = &pReader->verRange;
H
Hongze Cheng 已提交
5232 5233

  // alloc
H
Hongze Cheng 已提交
5234 5235
  STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
  if (pSnap == NULL) {
H
Hongze Cheng 已提交
5236 5237 5238 5239 5240
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
H
Hongze Cheng 已提交
5241
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
5242 5243

  // take snapshot
H
Hongze Cheng 已提交
5244
  if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
H
Hongze Cheng 已提交
5245 5246 5247 5248 5249 5250 5251 5252 5253 5254 5255
    pSnap->pMem = pTsdb->mem;
    pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
    if (pSnap->pNode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pNode->pQHandle = pReader;
    pSnap->pNode->reseek = reseek;

    tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
H
Hongze Cheng 已提交
5256 5257
  }

H
Hongze Cheng 已提交
5258
  if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
H
Hongze Cheng 已提交
5259 5260 5261 5262 5263 5264 5265 5266 5267 5268 5269
    pSnap->pIMem = pTsdb->imem;
    pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
    if (pSnap->pINode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pINode->pQHandle = pReader;
    pSnap->pINode->reseek = reseek;

    tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
H
Hongze Cheng 已提交
5270 5271
  }

H
Hongze Cheng 已提交
5272
  // fs
H
Hongze Cheng 已提交
5273
  code = tsdbFSRef(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
5274 5275 5276 5277
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
5278 5279

  // unlock
H
Hongze Cheng 已提交
5280
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
5281

5282
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
5283

H
Hongze Cheng 已提交
5284
_exit:
H
Hongze Cheng 已提交
5285 5286 5287 5288 5289 5290 5291 5292 5293 5294
  if (code) {
    *ppSnap = NULL;
    if (pSnap) {
      if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
      if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
      taosMemoryFree(pSnap);
    }
  } else {
    *ppSnap = pSnap;
  }
H
Hongze Cheng 已提交
5295 5296 5297
  return code;
}

5298
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) {
H
Hongze Cheng 已提交
5299 5300
  STsdb* pTsdb = pReader->pTsdb;

H
Hongze Cheng 已提交
5301 5302
  if (pSnap) {
    if (pSnap->pMem) {
5303
      tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
H
Hongze Cheng 已提交
5304 5305 5306
    }

    if (pSnap->pIMem) {
5307
      tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
H
Hongze Cheng 已提交
5308 5309
    }

H
Hongze Cheng 已提交
5310
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
5311 5312
    if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
    if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
H
Hongze Cheng 已提交
5313
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
5314
  }
5315
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
5316
}
5317 5318 5319 5320 5321

// if failed, do nothing
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
  taosMemoryFreeClear(pReader->idStr);
  pReader->idStr = taosStrdup(idstr);
5322
}