tsdbRead.c 166.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18
#include "tsimplehash.h"
19

H
Hongze Cheng 已提交
20
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
21

22 23 24 25 26 27
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

28
typedef struct {
dengyihao's avatar
dengyihao 已提交
29
  STbDataIter* iter;
30 31 32 33
  int32_t      index;
  bool         hasVal;
} SIterInfo;

34 35
typedef struct {
  int32_t numOfBlocks;
36
  int32_t numOfLastFiles;
37 38
} SBlockNumber;

39
typedef struct SBlockIndex {
40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
42
  STimeWindow window;  // todo replace it with overlap flag.
43 44
} SBlockIndex;

H
Haojun Liao 已提交
45
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
46 47
  uint64_t  uid;
  TSKEY     lastKey;
48
  TSKEY     lastKeyInStt;       // last accessed key in stt
H
Hongze Cheng 已提交
49
  SMapData  mapData;            // block info (compressed)
50
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
51 52 53 54 55 56
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
57 58 59
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
60
  int64_t uid;
61
  int64_t offset;
H
Haojun Liao 已提交
62
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
63 64

typedef struct SBlockOrderSupporter {
65 66 67 68
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
69 70 71
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
72 73 74
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
75
  int64_t headFileLoad;
76
  double  headFileLoadTime;
77
  int64_t smaDataLoad;
78
  double  smaLoadTime;
79 80
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
81 82
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
83
  double  createScanInfoList;
X
Xiaoyu Wang 已提交
84 85 86
  //  double  getTbFromMemTime;
  //  double  getTbFromIMemTime;
  double initDelSkylineIterTime;
H
Hongze Cheng 已提交
87 88 89
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
90 91 92 93 94 95 96
  SArray*        pColAgg;
  SColumnDataAgg tsColAgg;
  int16_t*       colId;
  int16_t*       slotId;
  int32_t        numOfCols;
  char**         buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
  bool           smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
97 98
} SBlockLoadSuppInfo;

99
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
100 101 102 103 104
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
105
  SSttBlockLoadInfo* pInfo;
106 107
} SLastBlockReader;

108
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
109 110 111
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
112
  int32_t           order;
H
Hongze Cheng 已提交
113
  SLastBlockReader* pLastBlockReader;  // last file block reader
114
} SFilesetIter;
H
Haojun Liao 已提交
115 116

typedef struct SFileDataBlockInfo {
117
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
118
  uint64_t uid;
119
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
120 121 122
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
123
  int32_t   numOfBlocks;
124
  int32_t   index;
H
Hongze Cheng 已提交
125
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
126
  int32_t   order;
H
Hongze Cheng 已提交
127
  SDataBlk  block;  // current SDataBlk data
128
  SHashObj* pTableMap;
H
Haojun Liao 已提交
129 130 131
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
132 133 134 135
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
136 137
} SFileBlockDumpInfo;

138
typedef struct STableUidList {
139 140
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
141
} STableUidList;
142

H
Haojun Liao 已提交
143
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
144 145 146
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
147
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
148
  STableUidList         uidList;            // check tables in uid order, to avoid the repeatly load of blocks in STT.
H
Hongze Cheng 已提交
149 150 151 152 153
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
154 155
} SReaderStatus;

156
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
157 158 159
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
D
dapan1121 已提交
160
  int32_t numOfTables;
161 162
} SBlockInfoBuf;

H
Hongze Cheng 已提交
163
struct STsdbReader {
H
Haojun Liao 已提交
164
  STsdb*             pTsdb;
165 166 167
  SVersionRange      verRange;
  TdThreadMutex      readerMutex;
  bool               suspended;
H
Haojun Liao 已提交
168 169
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
170
  bool               freeBlock;
H
Haojun Liao 已提交
171 172 173 174
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
175 176
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
177
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
178
  STsdbReadSnap*     pReadSnap;
179
  SIOCostSummary     cost;
180
  STSchema*          pSchema;      // the newest version schema
181 182
//  STSchema*          pMemSchema;   // the previous schema for in-memory data, to avoid load schema too many times
  SSHashObj*         pSchemaMap;   // keep the retrieved schema info, to avoid the overhead by repeatly load schema
183 184 185
  SDataFReader*      pFileReader;  // the file reader
  SDelFReader*       pDelFReader;  // the del file reader
  SArray*            pDelIdx;      // del file block index;
186 187 188
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
189
};
H
Hongze Cheng 已提交
190

H
Haojun Liao 已提交
191
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
192 193
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
194
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
195 196
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
197
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
198
                                       SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
199
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
200
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
201
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
H
Hongze Cheng 已提交
202
                                     STableBlockScanInfo* pInfo);
203
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
204
                                         int32_t rowIndex);
205
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
206 207
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
208

H
Hongze Cheng 已提交
209
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
210
                                        TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
H
Hongze Cheng 已提交
211
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
H
Hongze Cheng 已提交
212
                                  STsdbReader* pReader, SRow** pTSRow);
213 214
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
215

dengyihao's avatar
dengyihao 已提交
216 217 218 219
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
220
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
221 222 223
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
C
Cary Xu 已提交
224
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
225
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
226
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
227
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
C
Cary Xu 已提交
228

H
Haojun Liao 已提交
229 230
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);

231 232
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);

C
Cary Xu 已提交
233
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
234

235 236
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
                                   int32_t numOfCols) {
237
  pSupInfo->smaValid = true;
238
  pSupInfo->numOfCols = numOfCols;
239
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
H
Haojun Liao 已提交
240 241
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
242 243
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
244

H
Haojun Liao 已提交
245
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
246
  pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
247
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
248 249
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
250

H
Haojun Liao 已提交
251 252
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
253 254
    } else {
      pSupInfo->buildBuf[i] = NULL;
255
    }
H
Haojun Liao 已提交
256
  }
H
Hongze Cheng 已提交
257

H
Haojun Liao 已提交
258 259
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
260

H
Haojun Liao 已提交
261
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
262 263
  int32_t i = 0, j = 0;

H
Hongze Cheng 已提交
264
  while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
265
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
266
    if (pTCol->colId == pSupInfo->colId[j]) {
267 268
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
269
        return TSDB_CODE_SUCCESS;
270 271 272 273
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
274
    } else if (pTCol->colId < pSupInfo->colId[j]) {
275 276 277
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
278
      return TSDB_CODE_INVALID_PARA;
279 280
    }
  }
H
Haojun Liao 已提交
281 282

  return TSDB_CODE_SUCCESS;
283 284
}

285
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
286
  int32_t num = numOfTables / pBuf->numPerBucket;
287 288 289 290 291
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
292
  for (int32_t i = 0; i < num; ++i) {
293 294 295 296
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
297

298 299 300 301 302 303 304
    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
305
    }
306
    taosArrayPush(pBuf->pData, &p);
H
Haojun Liao 已提交
307
  }
H
Hongze Cheng 已提交
308

D
dapan1121 已提交
309 310 311 312 313 314 315 316 317 318 319
  pBuf->numOfTables = numOfTables;

  return TSDB_CODE_SUCCESS;
}

static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  if (numOfTables <= pBuf->numOfTables) {
    return TSDB_CODE_SUCCESS;
  }

  if (pBuf->numOfTables > 0) {
320
    STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
D
dapan1121 已提交
321
    taosMemoryFree(*p);
D
dapan1121 已提交
322 323
    pBuf->numOfTables /= pBuf->numPerBucket;
  }
324

D
dapan1121 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
  int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
  int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for (int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  pBuf->numOfTables = numOfTables;

H
Haojun Liao 已提交
350 351
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
352

353 354
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
355
  for (int32_t i = 0; i < num; ++i) {
356 357 358 359 360 361 362 363 364
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
365
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
366 367 368
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

H
Haojun Liao 已提交
369 370 371 372 373 374 375 376 377 378
static int32_t uidComparFunc(const void* p1, const void* p2) {
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
  if (pu1 == pu2) {
    return 0;
  } else {
    return (pu1 < pu2) ? -1 : 1;
  }
}

379
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Hongze Cheng 已提交
380
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
X
Xiaoyu Wang 已提交
381
                                         STableUidList* pUidList, int32_t numOfTables) {
H
Haojun Liao 已提交
382
  // allocate buffer in order to load data blocks from file
383
  // todo use simple hash instead, optimize the memory consumption
384 385 386
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
387 388 389
    return NULL;
  }

H
Haojun Liao 已提交
390
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
391
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393 394
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
  if (pUidList->tableUidList == NULL) {
H
Haojun Liao 已提交
395
    taosHashCleanup(pTableMap);
H
Haojun Liao 已提交
396 397
    return NULL;
  }
H
Haojun Liao 已提交
398

H
Haojun Liao 已提交
399
  pUidList->currentIndex = 0;
H
Haojun Liao 已提交
400

401
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
402
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
H
Haojun Liao 已提交
403

404
    pScanInfo->uid = idList[j].uid;
H
Haojun Liao 已提交
405
    pUidList->tableUidList[j] = idList[j].uid;
H
Haojun Liao 已提交
406

407
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
408
      int64_t skey = pTsdbReader->window.skey;
409
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
H
Haojun Liao 已提交
410
      pScanInfo->lastKeyInStt = skey;
wmmhello's avatar
wmmhello 已提交
411
    } else {
H
Haojun Liao 已提交
412
      int64_t ekey = pTsdbReader->window.ekey;
413
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
414
      pScanInfo->lastKeyInStt = ekey;
H
Haojun Liao 已提交
415
    }
wmmhello's avatar
wmmhello 已提交
416

417
    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
418 419
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
420 421
  }

H
Haojun Liao 已提交
422
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
H
Haojun Liao 已提交
423

H
Haojun Liao 已提交
424 425 426 427
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
428

429
  return pTableMap;
H
Hongze Cheng 已提交
430
}
H
Hongze Cheng 已提交
431

432
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
433
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
434
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
435
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
436 437

    pInfo->iterInit = false;
H
Haojun Liao 已提交
438
    pInfo->iter.hasVal = false;
439
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
440

441 442
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
443 444
    }

H
Haojun Liao 已提交
445 446
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
447 448
    }

449 450
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
451
    pInfo->lastKeyInStt = ts + step;
452 453 454
  }
}

455 456
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
457 458

  p->iter.hasVal = false;
459
  p->iiter.hasVal = false;
460

461 462 463
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
464

465 466 467
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
468

469 470 471 472
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
473

H
Haojun Liao 已提交
474
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
475
  void* p = NULL;
476
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
477
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
478 479 480 481 482
  }

  taosHashCleanup(pTableMap);
}

483
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
H
Hongze Cheng 已提交
484

485 486 487
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
488
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
489

490
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
491
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
492

dengyihao's avatar
dengyihao 已提交
493
  STimeWindow win = *pWindow;
494 495 496 497 498 499
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
500

H
Haojun Liao 已提交
501
// init file iterator
502
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
503
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
504

505 506
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
507
  pIter->pFileList = aDFileSet;
508
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
509

510 511 512 513
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
514
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
515 516
      return code;
    }
517 518
  }

519 520 521 522 523 524 525 526
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

527
  if (pLReader->pInfo == NULL) {
528
    // here we ignore the first column, which is always be the primary timestamp column
529 530 531
    SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;

    int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
X
Xiaoyu Wang 已提交
532
    pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
H
Haojun Liao 已提交
533 534 535 536
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
537 538
  }

539
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
540 541 542
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
543
static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) {
544 545
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
546
  pIter->index += step;
D
dapan1121 已提交
547
  int32_t code = 0;
548 549

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
550 551
    *hasNext = false;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
552 553
  }

H
Haojun Liao 已提交
554 555 556
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

557 558
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
559
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
560

H
Haojun Liao 已提交
561 562
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
563

564
  while (1) {
H
Haojun Liao 已提交
565 566 567
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
568

569
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
570

D
dapan1121 已提交
571
    code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
572 573 574
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
575

576 577
    pReader->cost.headFileLoad += 1;

578 579 580 581 582 583 584
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
585 586
      *hasNext = false;
      return TSDB_CODE_SUCCESS;
587 588 589 590
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
591
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
D
dapan1121 已提交
592 593
        *hasNext = false;
        return TSDB_CODE_SUCCESS;
594
      }
595 596
      continue;
    }
C
Cary Xu 已提交
597

598
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
599
              pReader->window.ekey, pReader->idStr);
D
dapan1121 已提交
600 601
    *hasNext = true;
    return TSDB_CODE_SUCCESS;
602
  }
603

604
_err:
D
dapan1121 已提交
605 606
  *hasNext = false;
  return code;
H
Haojun Liao 已提交
607 608
}

609
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
610 611
  pIter->order = order;
  pIter->index = -1;
612
  pIter->numOfBlocks = 0;
613 614 615 616 617 618 619
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
620
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
621

H
Haojun Liao 已提交
622
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
623 624
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
625 626
}

627 628 629 630 631 632 633 634
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
635
    SColumnInfoData colInfo = {0};
636 637 638 639 640 641 642 643 644 645 646 647 648
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexInit(&pReader->readerMutex, NULL);

  qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexDestroy(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexTryLock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

static int32_t tsdbReleaseReader(STsdbReader* pReader) {
  int32_t code = -1;
  qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  code = taosThreadMutexUnlock(&pReader->readerMutex);

  qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);

  return code;
}

704 705 706 707 708 709
void tsdbReleaseDataBlock(STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;
  if (!pStatus->composedDataBlock) {
    tsdbReleaseReader(pReader);
  }
}
710

711
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
712
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
713
  int32_t      code = 0;
714
  int8_t       level = 0;
H
Haojun Liao 已提交
715
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
716 717
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
718
    goto _end;
H
Hongze Cheng 已提交
719 720
  }

C
Cary Xu 已提交
721
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
722
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
723 724
  }

H
Haojun Liao 已提交
725
  initReaderStatus(&pReader->status);
726

L
Liu Jicong 已提交
727
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
728 729
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
730
  pReader->capacity = capacity;
H
Haojun Liao 已提交
731
  pReader->pResBlock = pResBlock;
732
  pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
dengyihao's avatar
dengyihao 已提交
733
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
734
  pReader->type = pCond->type;
735
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
736
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
737

H
Haojun Liao 已提交
738 739 740 741 742 743 744 745
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }
746

H
Haojun Liao 已提交
747 748 749 750 751
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }
H
Hongze Cheng 已提交
752

753 754
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
755
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
756
  if (pSup->pColAgg == NULL) {
757 758 759
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
760

761 762
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
763
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
764 765 766 767 768
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

769
  setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
770

771
  tsdbInitReaderLock(pReader);
772

H
Hongze Cheng 已提交
773 774
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
775

H
Haojun Liao 已提交
776 777
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
778 779 780
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
781

H
Haojun Liao 已提交
782
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
X
Xiaoyu Wang 已提交
783
  int64_t    st = taosGetTimestampUs();
784 785 786
  LRUHandle* handle = NULL;
  int32_t    code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
  if (code != TSDB_CODE_SUCCESS || handle == NULL) {
787
    goto _end;
H
Haojun Liao 已提交
788
  }
H
Hongze Cheng 已提交
789

H
Haojun Liao 已提交
790 791
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);

792 793
  SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
  size_t  num = taosArrayGetSize(aBlockIdx);
794
  if (num == 0) {
795
    tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
796 797
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
798

H
Haojun Liao 已提交
799
  // todo binary search to the start position
800 801
  int64_t et1 = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
802
  SBlockIdx*     pBlockIdx = NULL;
803
  STableUidList* pList = &pReader->status.uidList;
H
Haojun Liao 已提交
804

H
Haojun Liao 已提交
805
  int32_t i = 0, j = 0;
X
Xiaoyu Wang 已提交
806
  while (i < num && j < numOfTables) {
H
Haojun Liao 已提交
807
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Hongze Cheng 已提交
808
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
809
      i += 1;
H
Haojun Liao 已提交
810 811 812
      continue;
    }

H
Haojun Liao 已提交
813 814
    if (pBlockIdx->uid < pList->tableUidList[j]) {
      i += 1;
H
Haojun Liao 已提交
815 816 817
      continue;
    }

H
Haojun Liao 已提交
818
    if (pBlockIdx->uid > pList->tableUidList[j]) {
H
Haojun Liao 已提交
819
      j += 1;
H
Haojun Liao 已提交
820
      continue;
H
Haojun Liao 已提交
821 822
    }

H
Haojun Liao 已提交
823
    if (pBlockIdx->uid == pList->tableUidList[j]) {
H
Haojun Liao 已提交
824
      // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
825 826 827
      STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
      if (pScanInfo == NULL) {
        return terrno;
H
Haojun Liao 已提交
828 829 830 831 832 833 834
      }

      if (pScanInfo->pBlockList == NULL) {
        pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
      }

      taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
835

H
Haojun Liao 已提交
836
      i += 1;
H
Haojun Liao 已提交
837
      j += 1;
838
    }
H
Haojun Liao 已提交
839
  }
H
Hongze Cheng 已提交
840

841
  int64_t et2 = taosGetTimestampUs();
H
Haojun Liao 已提交
842 843 844
  tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            numOfTables, (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0,
            pReader->idStr);
845 846 847

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

848
_end:
849
  tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
H
Haojun Liao 已提交
850 851
  return code;
}
H
Hongze Cheng 已提交
852

853
static void cleanupTableScanInfo(SHashObj* pTableMap) {
854
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
855
  while (1) {
856
    px = taosHashIterate(pTableMap, px);
857 858 859 860
    if (px == NULL) {
      break;
    }

861
    // reset the index in last block when handing a new file
862 863
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
864
  }
865 866
}

867
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
868 869 870 871 872 873
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
874

dengyihao's avatar
dengyihao 已提交
875
  for (int32_t i = 0; i < numOfTables; ++i) {
X
Xiaoyu Wang 已提交
876
    SBlockIdx*           pBlockIdx = taosArrayGet(pIndexList, i);
H
Haojun Liao 已提交
877 878 879 880
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
    if (pScanInfo == NULL) {
      return terrno;
    }
H
Hongze Cheng 已提交
881

882
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
883
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
884
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
885

886
    sizeInDisk += pScanInfo->mapData.nData;
887 888 889 890 891 892 893 894 895 896 897 898 899

    int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
    STimeWindow w = pReader->window;
    if (ASCENDING_TRAVERSE(pReader->order)) {
      w.skey = pScanInfo->lastKey + step;
    } else {
      w.ekey = pScanInfo->lastKey + step;
    }

    if (isEmptyQueryTimeWindow(&w)) {
      continue;
    }

H
Haojun Liao 已提交
900
    SDataBlk block = {0};
901
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
902
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
903

904
      // 1. time range check
905 906
      // if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
      if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
H
Haojun Liao 已提交
907 908
        continue;
      }
H
Hongze Cheng 已提交
909

910
      // 2. version range check
H
Hongze Cheng 已提交
911
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
912 913
        continue;
      }
914

915
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
916
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
917

H
Haojun Liao 已提交
918 919
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
920
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
921 922
        return TSDB_CODE_OUT_OF_MEMORY;
      }
923

924
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
925
    }
H
Hongze Cheng 已提交
926

H
Haojun Liao 已提交
927
    if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
928 929 930 931
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
932
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
933
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
934

935
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
936
  tsdbDebug(
937
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
938
      "time:%.2f ms %s",
939
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
940
      pReader->idStr);
941

942
  pReader->cost.numOfBlocks += total;
943
  pReader->cost.headFileLoadTime += el;
944

H
Haojun Liao 已提交
945 946
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
947

948
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
949
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
950
  pDumpInfo->allDumped = true;
951
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
952 953
}

954 955
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
956
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
957
    if (!COL_VAL_IS_VALUE(pColVal)) {
958
      colDataSetNULL(pColInfoData, rowIndex);
H
Haojun Liao 已提交
959 960
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
961
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
962 963 964 965
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

966
      colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
H
Haojun Liao 已提交
967 968
    }
  } else {
969
    colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
970
  }
H
Haojun Liao 已提交
971 972
}

973
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
974 975 976
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
977 978
    return NULL;
  }
979 980 981

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
982 983
}

H
Hongze Cheng 已提交
984
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
985

C
Cary Xu 已提交
986 987 988 989 990 991
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Haojun Liao 已提交
992
  ASSERT(pos >= 0 && pos < num && num > 0);
C
Cary Xu 已提交
993 994
  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
995 996
    e = num - 1;
    if (key < keyList[pos]) return -1;
C
Cary Xu 已提交
997 998
    while (1) {
      // check can return
H
Hongze Cheng 已提交
999 1000 1001
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
C
Cary Xu 已提交
1002 1003

      // change start or end position
H
Hongze Cheng 已提交
1004
      int mid = s + (e - s + 1) / 2;
C
Cary Xu 已提交
1005 1006
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
1007
      else if (keyList[mid] < key)
C
Cary Xu 已提交
1008 1009 1010 1011
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
1012
  } else {  // DESC
C
Cary Xu 已提交
1013
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
1014 1015
    e = 0;
    if (key > keyList[pos]) return -1;
C
Cary Xu 已提交
1016 1017
    while (1) {
      // check can return
H
Hongze Cheng 已提交
1018 1019 1020
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
C
Cary Xu 已提交
1021 1022

      // change start or end position
H
Hongze Cheng 已提交
1023
      int mid = s - (s - e + 1) / 2;
C
Cary Xu 已提交
1024 1025
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
1026
      else if (keyList[mid] > key)
C
Cary Xu 已提交
1027 1028 1029 1030 1031 1032 1033
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
1034
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
C
Cary Xu 已提交
1035 1036
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
1037
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
1038 1039 1040 1041 1042 1043

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
1044 1045
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
C
Cary Xu 已提交
1046 1047 1048 1049 1050
  }

  return endPos;
}

H
Haojun Liao 已提交
1051
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
1071 1072
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Hongze Cheng 已提交
1073
                            int32_t dumpedRows, bool asc) {
H
Haojun Liao 已提交
1074 1075 1076 1077 1078 1079 1080 1081
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

H
Hongze Cheng 已提交
1082
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1083

H
Haojun Liao 已提交
1084
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
1085
  //  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
1086 1087 1088 1089 1090 1091

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
H
Hongze Cheng 已提交
1092
    switch (pColData->info.type) {
H
Haojun Liao 已提交
1093 1094 1095
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
H
Hongze Cheng 已提交
1096
      case TSDB_DATA_TYPE_UBIGINT: {
H
Haojun Liao 已提交
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
H
Hongze Cheng 已提交
1110
        int32_t mid = dumpedRows >> 1u;
H
Haojun Liao 已提交
1111 1112
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1113
          int8_t t = pts[j];
H
Haojun Liao 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1138
          int32_t t = pts[j];
H
Haojun Liao 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1161
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
1162 1163 1164 1165
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1166

1167
  SBlockData*         pBlockData = &pStatus->fileBlockData;
C
Cary Xu 已提交
1168
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1169
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1170
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1171
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1172

H
Haojun Liao 已提交
1173
  SColVal cv = {0};
1174
  int64_t st = taosGetTimestampUs();
1175 1176
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1177

1178 1179
  // no data exists, return directly.
  if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
X
Xiaoyu Wang 已提交
1180 1181
    tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
             pBlockInfo->uid, pReader->idStr);
1182 1183 1184 1185
    pResBlock->info.rows = 0;
    return 0;
  }

1186 1187
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1188 1189 1190
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1191
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1192
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1193 1194 1195
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1205
    }
C
Cary Xu 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
  }

  // time window check
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1216 1217 1218
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1219 1220
  }

H
Haojun Liao 已提交
1221
  int32_t i = 0;
C
Cary Xu 已提交
1222 1223
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1224 1225
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1226
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1227 1228 1229
    i += 1;
  }

1230
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1231
  int32_t num = pBlockData->nColData;
1232
  while (i < numOfOutputCols && colIndex < num) {
1233 1234
    rowIndex = 0;

H
Hongze Cheng 已提交
1235
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1236
    if (pData->cid < pSupInfo->colId[i]) {
1237
      colIndex += 1;
H
Haojun Liao 已提交
1238 1239
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1240

H
Hongze Cheng 已提交
1241
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
1242
        colDataSetNNULL(pColData, 0, dumpedRows);
C
Cary Xu 已提交
1243
      } else {
H
Haojun Liao 已提交
1244
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1245 1246
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1247
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
C
Cary Xu 已提交
1248 1249 1250 1251
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1252
      }
C
Cary Xu 已提交
1253

1254
      colIndex += 1;
1255
      i += 1;
1256
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1257
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1258
      colDataSetNNULL(pColData, 0, dumpedRows);
1259
      i += 1;
H
Haojun Liao 已提交
1260
    }
1261 1262
  }

1263
  // fill the mis-matched columns with null value
1264
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1265
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
1266
    colDataSetNNULL(pColData, 0, dumpedRows);
1267
    i += 1;
H
Haojun Liao 已提交
1268
  }
H
Haojun Liao 已提交
1269

1270
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1271 1272
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1273

1274
  // check if current block are all handled
C
Cary Xu 已提交
1275 1276
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1277 1278 1279
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
C
Cary Xu 已提交
1280
  } else {
1281 1282
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
C
Cary Xu 已提交
1283
  }
H
Haojun Liao 已提交
1284

1285
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1286
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1287

1288
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1289
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1290
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1291
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1292
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1293 1294 1295 1296

  return TSDB_CODE_SUCCESS;
}

1297 1298
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1299
  int32_t code = 0;
1300 1301
  int64_t st = taosGetTimestampUs();

1302
  tBlockDataReset(pBlockData);
1303 1304
  STSchema* pSchema = getLatestTableSchema(pReader, uid);
  if (pSchema == NULL) {
X
Xiaoyu Wang 已提交
1305
    tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
1306 1307 1308 1309
    return code;
  }

  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
X
Xiaoyu Wang 已提交
1310
  TABLEID             tid = {.suid = pReader->suid, .uid = uid};
1311
  code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
1312 1313 1314 1315
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1316
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1317
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1318

H
Hongze Cheng 已提交
1319
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1320
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1321 1322 1323
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
1324
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1325 1326 1327
              tstrerror(code), pReader->idStr);
    return code;
  }
1328

1329
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1330

1331 1332 1333 1334
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1335 1336 1337

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1338

H
Haojun Liao 已提交
1339
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1340
}
H
Hongze Cheng 已提交
1341

H
Haojun Liao 已提交
1342 1343 1344
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1345

H
Haojun Liao 已提交
1346 1347 1348 1349
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1350

H
Haojun Liao 已提交
1351 1352
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1353

H
Haojun Liao 已提交
1354 1355
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1356 1357
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1358

H
Haojun Liao 已提交
1359 1360 1361 1362
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1363

H
Haojun Liao 已提交
1364 1365
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1366

H
Haojun Liao 已提交
1367
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1368
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1369
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1370

H
Haojun Liao 已提交
1371
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1372

H
Haojun Liao 已提交
1373 1374
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1375

H
Haojun Liao 已提交
1376 1377 1378 1379 1380 1381 1382
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1383

1384
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1385
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1386

1387 1388 1389
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1390
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1391 1392
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1393
    STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1394
    if (pScanInfo == NULL) {
H
Haojun Liao 已提交
1395
      return terrno;
H
Haojun Liao 已提交
1396 1397
    }

H
Haojun Liao 已提交
1398 1399
    SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1400
  }
1401 1402 1403 1404 1405 1406

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1407
}
H
Hongze Cheng 已提交
1408

1409
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1410
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1411

1412
  SBlockOrderSupporter sup = {0};
1413
  pBlockIter->numOfBlocks = numOfBlocks;
1414
  taosArrayClear(pBlockIter->blockList);
1415
  pBlockIter->pTableMap = pReader->status.pTableMap;
1416

1417 1418
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1419

1420
  int64_t st = taosGetTimestampUs();
1421
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1422 1423 1424
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1425

1426 1427 1428 1429 1430 1431 1432
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1433

1434
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1435 1436 1437
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1438

1439 1440
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1441

1442 1443 1444
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1445
      return TSDB_CODE_OUT_OF_MEMORY;
1446
    }
H
Haojun Liao 已提交
1447

1448
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1449

1450 1451 1452
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1453
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1454 1455 1456 1457 1458
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1459

H
Haojun Liao 已提交
1460 1461 1462 1463
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1464

1465
  // since there is only one table qualified, blocks are not sorted
1466 1467
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1468 1469
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1470
    }
1471

1472
    int64_t et = taosGetTimestampUs();
1473
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1474
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1475

1476
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1477
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1478
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1479
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1480
  }
H
Haojun Liao 已提交
1481

1482 1483
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1484

1485
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1486 1487

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1488 1489
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1490
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1491
  }
H
Haojun Liao 已提交
1492

1493 1494 1495 1496
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1497

1498 1499
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1500

1501 1502 1503 1504
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1505

1506 1507
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1508
  }
H
Haojun Liao 已提交
1509

1510
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1511 1512
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1513 1514
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1515

1516
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1517
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1518

1519
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1520
}
H
Hongze Cheng 已提交
1521

H
Haojun Liao 已提交
1522
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1523 1524
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1525
  int32_t step = asc ? 1 : -1;
1526
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1527 1528 1529
    return false;
  }

1530
  pBlockIter->index += step;
H
Haojun Liao 已提交
1531
  doSetCurrentBlock(pBlockIter, idStr);
1532

1533 1534 1535
  return true;
}

1536 1537 1538
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1539
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1540 1541
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1542 1543
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1544
}
H
Hongze Cheng 已提交
1545

1546
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1547
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1548
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1549
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1550
    return false;
1551 1552
  }

H
Haojun Liao 已提交
1553
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1554
    return false;
1555 1556
  }

1557
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1558
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1559 1560
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1561
  return true;
1562 1563 1564
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1565
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1566 1567
  int32_t index = pBlockIter->index;

1568
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1580
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1581
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1582 1583 1584 1585
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1586 1587 1588 1589 1590
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1591

1592 1593 1594
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1595

H
Haojun Liao 已提交
1596
  doSetCurrentBlock(pBlockIter, "");
1597 1598 1599
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1600
// todo: this attribute could be acquired during extractin the global ordered block list.
1601
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1602 1603
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1604
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1605
  } else {
1606
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1607
  }
H
Haojun Liao 已提交
1608
}
H
Hongze Cheng 已提交
1609

H
Hongze Cheng 已提交
1610
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1611
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1612

1613
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1614
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1615
}
H
Hongze Cheng 已提交
1616

H
Hongze Cheng 已提交
1617
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1618 1619
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1620 1621
}

H
Hongze Cheng 已提交
1622 1623
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1624 1625
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1626
  for (int32_t i = startIndex; i < num; i += 1) {
1627 1628
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1629
      if (p->version >= pBlock->minVer) {
1630 1631 1632
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1633
      if (p->version >= pBlock->minVer) {
1634 1635
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1636 1637
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1651
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1652 1653 1654 1655
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1656
  // ts is not overlap
1657
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1658
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1659 1660 1661 1662 1663
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1664
  if (ASCENDING_TRAVERSE(order)) {
1665
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1666 1667
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1668
    while (1) {
1669 1670 1671 1672 1673
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1674 1675 1676
      }
    }

1677
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1678
  }
1679 1680
}

C
Cary Xu 已提交
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1694 1695
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1696

1697
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1698

1699
  // overlap with neighbor
1700
  if (hasNeighbor) {
1701
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1702 1703
  }

1704
  // has duplicated ts of different version in this block
C
Cary Xu 已提交
1705 1706
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1707

1708 1709 1710
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1711 1712
  }

C
Cary Xu 已提交
1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}

// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
1728

C
Cary Xu 已提交
1729 1730 1731
  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1732 1733 1734 1735

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
X
Xiaoyu Wang 已提交
1736
              " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
1737
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
C
Cary Xu 已提交
1738 1739 1740
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1741 1742 1743
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1744 1745
}

C
Cary Xu 已提交
1746 1747 1748 1749 1750 1751 1752 1753 1754
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1755
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1756
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1757 1758
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1759

1760 1761 1762
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1763
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1764

H
Haojun Liao 已提交
1765
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1766
  pBlock->info.id.uid = pBlockScanInfo->uid;
1767

1768
  setComposedBlockFlag(pReader, true);
1769

1770
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1771
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
X
Xiaoyu Wang 已提交
1772
            " - %" PRId64 ", uid:%" PRIu64 ",  %s",
1773
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
1774
            pBlockScanInfo->uid, pReader->idStr);
1775 1776

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1777 1778 1779
  return code;
}

1780 1781
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1782 1783 1784
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1785 1786
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1787
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1788 1789

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1790
    if (nextKey != key) {  // merge is not needed
1791
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1792 1793 1794 1795 1796 1797 1798 1799
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1800
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
1801
                                  SVersionRange* pVerRange) {
X
Xiaoyu Wang 已提交
1802
  int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
H
Haojun Liao 已提交
1803

1804 1805
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
1806
    if (!hasVal) {  // the next value will be the accessed key in stt
1807
      pScanInfo->lastKeyInStt += step;
1808 1809 1810 1811 1812
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1813 1814 1815
    pScanInfo->lastKeyInStt = k.ts;

    if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
H
Haojun Liao 已提交
1816 1817
      // the qualifed ts may equal to k.ts, only a greater version one.
      // here we need to fallback one step.
1818 1819 1820 1821 1822 1823 1824
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1825
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

1840 1841 1842 1843 1844 1845
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
  if (pReader->pSchema != NULL) {
    return pReader->pSchema;
  }

  pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
X
Xiaoyu Wang 已提交
1846
  if (pReader->pSchema == NULL) {
1847 1848 1849
    tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
  }

X
Xiaoyu Wang 已提交
1850
  return pReader->pSchema;
1851 1852
}

H
Haojun Liao 已提交
1853 1854 1855
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1856
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1857 1858
  }

1859
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1860 1861 1862
    return pReader->pSchema;
  }

1863 1864 1865
  void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
  if (p != NULL) {
    return *(STSchema**) p;
H
Haojun Liao 已提交
1866 1867
  }

1868 1869 1870
  STSchema* ptr = NULL;
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1871 1872
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1873
  } else {
1874 1875 1876 1877 1878 1879
    code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
    return ptr;
H
Haojun Liao 已提交
1880
  }
H
Haojun Liao 已提交
1881 1882
}

1883
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1884 1885
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
1886
  SRow*               pTSRow = NULL;
1887 1888 1889
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1890
  int64_t tsLast = INT64_MIN;
1891
  if (hasDataInLastBlock(pLastBlockReader)) {
1892 1893
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1894

H
Hongze Cheng 已提交
1895 1896
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1897

1898 1899
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1900
    minKey = INT64_MAX;  // chosen the minimum value
1901
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1902 1903
      minKey = tsLast;
    }
1904

1905 1906 1907
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1908

1909
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1910 1911 1912 1913
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1914
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1915 1916 1917 1918 1919 1920 1921
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1922
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1923 1924
      minKey = key;
    }
1925 1926 1927 1928
  }

  bool init = false;

1929
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1930
  // DESC: mem -----> imem -----> last block -----> file block
1931 1932
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1933
      init = true;
H
Hongze Cheng 已提交
1934
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1935 1936 1937
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1938
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1939 1940
    }

1941
    if (minKey == tsLast) {
1942
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1943
      if (init) {
H
Hongze Cheng 已提交
1944
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1945
      } else {
1946
        init = true;
H
Hongze Cheng 已提交
1947
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1948 1949 1950
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1951
      }
1952
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
1953
    }
1954

1955
    if (minKey == k.ts) {
K
kailixu 已提交
1956 1957 1958 1959
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1960
      if (init) {
X
Xiaoyu Wang 已提交
1961
        tsdbRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1962
      } else {
1963
        init = true;
X
Xiaoyu Wang 已提交
1964
        int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1965 1966 1967 1968 1969 1970 1971
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1972 1973 1974 1975 1976
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1977
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
1978
      int32_t   code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1979 1980 1981 1982 1983
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1984
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1985 1986
        return code;
      }
1987 1988
    }

1989
    if (minKey == tsLast) {
1990
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1991
      if (init) {
H
Hongze Cheng 已提交
1992
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
1993
      } else {
1994
        init = true;
H
Hongze Cheng 已提交
1995
        int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1996 1997 1998
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1999
      }
2000
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2001 2002 2003
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2004
      if (init) {
H
Hongze Cheng 已提交
2005
        tsdbRowMerge(&merge, &fRow);
H
Haojun Liao 已提交
2006
      } else {
2007
        init = true;
H
Hongze Cheng 已提交
2008
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2009 2010 2011
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2012 2013 2014
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
2015 2016
  }

H
Hongze Cheng 已提交
2017
  int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
2018 2019 2020 2021
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2022
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2023 2024

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2025
  tsdbRowMergerClear(&merge);
2026 2027 2028
  return TSDB_CODE_SUCCESS;
}

2029 2030 2031
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
2032
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
2033
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
2034

H
Hongze Cheng 已提交
2035
  SRow*      pTSRow = NULL;
2036
  SRowMerger merge = {0};
2037
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2038
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
2039

2040 2041 2042
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
2043
      pBlockScanInfo->lastKey = tsLastBlock;
2044 2045
      return TSDB_CODE_SUCCESS;
    } else {
H
Hongze Cheng 已提交
2046
      int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2047 2048 2049
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2050

2051
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2052
      tsdbRowMerge(&merge, &fRow1);
2053
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
2054

H
Hongze Cheng 已提交
2055
      code = tsdbRowMergerGetRow(&merge, &pTSRow);
2056 2057 2058
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2059

2060
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2061 2062

      taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2063
      tsdbRowMergerClear(&merge);
2064 2065
    }
  } else {  // not merge block data
H
Hongze Cheng 已提交
2066
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2067 2068 2069 2070
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2071
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
2072 2073

    // merge with block data if ts == key
H
Haojun Liao 已提交
2074
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
2075 2076 2077
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Hongze Cheng 已提交
2078
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2079 2080 2081 2082
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2083
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2084 2085

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2086
    tsdbRowMergerClear(&merge);
2087
  }
2088 2089 2090 2091

  return TSDB_CODE_SUCCESS;
}

2092 2093
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
2094 2095
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2096
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
2097
    // no last block available, only data block exists
2098
    if (!hasDataInLastBlock(pLastBlockReader)) {
2099 2100 2101 2102 2103 2104 2105 2106 2107
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
2108
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
2109 2110
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
H
Hongze Cheng 已提交
2111
        SRow*      pTSRow = NULL;
2112
        SRowMerger merge = {0};
2113

H
Hongze Cheng 已提交
2114
        int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2115 2116 2117 2118
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2119
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2120 2121

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
2122
        tsdbRowMerge(&merge, &fRow1);
2123

2124
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
2125

H
Hongze Cheng 已提交
2126
        code = tsdbRowMergerGetRow(&merge, &pTSRow);
2127 2128 2129 2130
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

2131
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2132

2133
        taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2134
        tsdbRowMergerClear(&merge);
2135
        return code;
2136
      } else {
2137
        return TSDB_CODE_SUCCESS;
2138
      }
2139
    } else {  // desc order
2140
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
2141
    }
2142
  } else {  // only last block exists
2143
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2144
  }
2145 2146
}

2147 2148
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2149
  SRowMerger          merge = {0};
H
Hongze Cheng 已提交
2150
  SRow*               pTSRow = NULL;
H
Haojun Liao 已提交
2151
  int32_t             code = TSDB_CODE_SUCCESS;
2152 2153 2154
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2155 2156
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2157

2158
  int64_t tsLast = INT64_MIN;
2159 2160 2161
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2162

H
Hongze Cheng 已提交
2163
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2164 2165 2166 2167

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2168
  int64_t minKey = 0;
2169 2170 2171 2172 2173
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2174

2175 2176 2177
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2178

2179
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2180 2181
      minKey = key;
    }
2182

2183
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2184 2185 2186
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2187
    minKey = INT64_MIN;  // let find the maximum ts value
2188 2189 2190 2191 2192 2193 2194 2195
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2196
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2197 2198 2199
      minKey = key;
    }

2200
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2201 2202
      minKey = tsLast;
    }
2203 2204 2205 2206
  }

  bool init = false;

2207 2208 2209 2210
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2211
      init = true;
2212
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Hongze Cheng 已提交
2213
      code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2214 2215 2216 2217
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2218
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2219 2220
    }

2221
    if (minKey == tsLast) {
2222
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2223
      if (init) {
H
Hongze Cheng 已提交
2224
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2225
      } else {
2226
        init = true;
H
Hongze Cheng 已提交
2227
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2228 2229 2230
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2231
      }
H
Haojun Liao 已提交
2232

2233
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2234 2235 2236
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2237
      if (init) {
H
Hongze Cheng 已提交
2238
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2239
      } else {
2240 2241
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2242 2243 2244 2245
        if (pSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2246
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2247 2248 2249
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2250
      }
H
Haojun Liao 已提交
2251

2252 2253
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2254 2255
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2256
      }
2257 2258
    }

2259
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2260
      if (init) {
2261 2262 2263 2264
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Hongze Cheng 已提交
2265
        tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
2266
      } else {
2267
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2268
        code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2269 2270 2271
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2272
      }
H
Haojun Liao 已提交
2273 2274 2275 2276
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2277 2278 2279 2280 2281
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2282
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2283
      code = tsdbRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2284 2285 2286 2287
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2288 2289 2290 2291 2292
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2293 2294 2295
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2296
      if (init) {
H
Hongze Cheng 已提交
2297
        tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
2298
      } else {
2299 2300
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Hongze Cheng 已提交
2301
        code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2302 2303 2304
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2305
      }
H
Haojun Liao 已提交
2306 2307 2308 2309
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
2310 2311 2312 2313
      }
    }

    if (minKey == tsLast) {
2314
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2315
      if (init) {
H
Hongze Cheng 已提交
2316
        tsdbRowMerge(&merge, &fRow1);
H
Haojun Liao 已提交
2317
      } else {
2318
        init = true;
H
Hongze Cheng 已提交
2319
        code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2320 2321 2322
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2323
      }
2324
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
2325 2326 2327
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2328
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2329
      if (!init) {
H
Hongze Cheng 已提交
2330
        code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2331 2332 2333
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2334
      } else {
2335 2336 2337
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Hongze Cheng 已提交
2338
        tsdbRowMerge(&merge, &fRow);
2339 2340
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2341 2342 2343
    }
  }

2344
  if (merge.pTSchema == NULL) {
2345 2346 2347
    return code;
  }

H
Hongze Cheng 已提交
2348
  code = tsdbRowMergerGetRow(&merge, &pTSRow);
2349 2350 2351 2352
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2353
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2354 2355

  taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2356
  tsdbRowMergerClear(&merge);
2357
  return code;
2358 2359
}

2360 2361 2362 2363 2364 2365 2366 2367 2368
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
2369 2370
    // startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
2371
  } else {
2372 2373
    // startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
    startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
2374 2375 2376
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
D
dapan1121 已提交
2377
  int64_t st = 0;
2378 2379 2380 2381 2382 2383 2384 2385 2386

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2387
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2388
                  "-%" PRId64 " %s",
2389 2390
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2391
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2392 2393 2394 2395 2396
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2397
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2408
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2409
                  "-%" PRId64 " %s",
2410 2411
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2412
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2413 2414 2415 2416 2417
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2418
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2419 2420
  }

2421
  st = taosGetTimestampUs();
2422
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
2423
  pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
2424 2425 2426 2427 2428

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2429 2430
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2431 2432 2433 2434 2435 2436 2437 2438
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2450
  TSDBKEY k = {.ts = ts, .version = ver};
2451 2452
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2453 2454 2455
    return false;
  }

2456 2457 2458
  return true;
}

2459
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2460
  // the last block reader has been initialized for this table.
2461
  if (pLBlockReader->uid == pScanInfo->uid) {
2462
    return hasDataInLastBlock(pLBlockReader);
2463 2464
  }

2465 2466
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2467 2468
  }

2469 2470
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2471

2472 2473
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
2474
    w.skey = pScanInfo->lastKeyInStt;
2475
  } else {
2476
    w.ekey = pScanInfo->lastKeyInStt;
2477 2478
  }

X
Xiaoyu Wang 已提交
2479 2480
  tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
            pScanInfo->uid, pReader->idStr);
2481 2482
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
2483
                                pLBlockReader->pInfo, false, pReader->idStr, false);
2484 2485 2486 2487
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2488
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2489 2490
}

2491
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2492
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2493
  return TSDBROW_TS(&row);
2494 2495
}

H
Hongze Cheng 已提交
2496
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2497

2498
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2499
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
2500
    return false;  // this is an invalid result.
2501
  }
2502
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2503
}
2504

2505 2506
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2507 2508
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2509
    pBlockScanInfo->lastKey = key;
2510 2511
    return TSDB_CODE_SUCCESS;
  } else {
C
Cary Xu 已提交
2512 2513
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

H
Hongze Cheng 已提交
2514
    SRow*      pTSRow = NULL;
2515 2516
    SRowMerger merge = {0};

H
Hongze Cheng 已提交
2517
    int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2518 2519 2520 2521
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2522
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Hongze Cheng 已提交
2523
    code = tsdbRowMergerGetRow(&merge, &pTSRow);
2524 2525 2526 2527
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2528
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2529 2530

    taosMemoryFree(pTSRow);
H
Hongze Cheng 已提交
2531
    tsdbRowMergerClear(&merge);
2532 2533 2534 2535
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2536 2537
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2538 2539
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2540
  TSDBROW *pRow = NULL, *piRow = NULL;
2541
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2542 2543 2544
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2545

2546 2547 2548
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
C
Cary Xu 已提交
2549

2550 2551 2552 2553
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2554

2555 2556 2557 2558
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2559

2560 2561 2562
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2563
  }
2564 2565 2566

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2567 2568
}

H
Haojun Liao 已提交
2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2622
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2623 2624
  int32_t code = TSDB_CODE_SUCCESS;

2625 2626
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2627
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
C
Cary Xu 已提交
2628 2629
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2630
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
C
Cary Xu 已提交
2631
  int64_t st = taosGetTimestampUs();
2632
  int32_t step = asc ? 1 : -1;
2633
  double  el = 0;
2634 2635 2636

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
2637 2638
    pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
    if (pBlockScanInfo == NULL) {
H
Haojun Liao 已提交
2639 2640 2641
      goto _end;
    }

C
Cary Xu 已提交
2642
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2643
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
C
Cary Xu 已提交
2644 2645

    // it is a clean block, load it directly
H
Hongze Cheng 已提交
2646
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
2647
        pBlock->nRow <= pReader->capacity) {
2648
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
2649
        copyBlockDataToSDataBlock(pReader);
2650 2651

        // record the last key value
H
Hongze Cheng 已提交
2652
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2653 2654
        goto _end;
      }
C
Cary Xu 已提交
2655 2656
    }
  } else {  // file blocks not exist
2657
    pBlockScanInfo = *pReader->status.pTableIter;
2658 2659
  }

2660
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2661
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2662

2663
  while (1) {
2664
    bool hasBlockData = false;
2665
    {
2666 2667
      while (pBlockData->nRow > 0 &&
             pBlockData->uid == pBlockScanInfo->uid) {  // find the first qualified row in data block
2668 2669 2670 2671 2672
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2673 2674
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2675
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2676
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2677
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2678

H
Haojun Liao 已提交
2679 2680 2681 2682 2683
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2684 2685
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2686
          }
2687 2688
        }
      }
2689
    }
2690

2691
    // no data in last block and block, no need to proceed.
2692
    if (hasBlockData == false) {
2693
      break;
2694 2695
    }

2696
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2697

2698
    // currently loaded file data block is consumed
2699
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2700
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2701
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2702 2703 2704 2705 2706
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2707 2708 2709
    }
  }

H
Hongze Cheng 已提交
2710
_end:
2711 2712
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2713

2714 2715 2716
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2717
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2718
              pResBlock->info.rows, el, pReader->idStr);
2719
  }
2720

H
Haojun Liao 已提交
2721
  return code;
2722 2723 2724 2725
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2726 2727 2728 2729 2730 2731 2732 2733
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2734 2735
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2736 2737 2738
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2739

2740
  int32_t code = 0;
2741 2742
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2743
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2744
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2745
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2746
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2747

H
Haojun Liao 已提交
2748
    if (pIdx != NULL) {
2749
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2750 2751 2752
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2753
    }
2754
  }
2755

2756 2757 2758 2759 2760 2761 2762
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2763 2764
  }

2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2779 2780 2781 2782 2783 2784 2785
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2786 2787
  return code;

2788 2789 2790
_err:
  taosArrayDestroy(pDelData);
  return code;
2791 2792
}

C
Cary Xu 已提交
2793
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2794
  bool asc = ASCENDING_TRAVERSE(pReader->order);
X
Xiaoyu Wang 已提交
2795
  //  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;
2796

X
Xiaoyu Wang 已提交
2797
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
2798

X
Xiaoyu Wang 已提交
2799
  bool     hasKey = false, hasIKey = false;
2800
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2801
  if (pRow != NULL) {
2802
    hasKey = true;
2803 2804 2805
    key = TSDBROW_KEY(pRow);
  }

2806 2807 2808 2809
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
2810 2811
  }

2812
  if (hasKey) {
X
Xiaoyu Wang 已提交
2813
    if (hasIKey) {  // has data in mem & imem
2814 2815
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
X
Xiaoyu Wang 已提交
2816 2817
      } else {
        return key.ts <= ikey.ts ? ikey : key;
2818 2819 2820
      }
    } else {  // no data in imem
      return key;
2821
    }
2822 2823 2824 2825
  } else {
    // no data in mem & imem, return the initial value
    // only imem has data, return ikey
    return ikey;
2826 2827 2828
  }
}

2829
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2830
  SReaderStatus* pStatus = &pReader->status;
2831
  pBlockNum->numOfBlocks = 0;
2832
  pBlockNum->numOfLastFiles = 0;
2833

2834
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2835
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2836 2837

  while (1) {
dengyihao's avatar
dengyihao 已提交
2838
    bool    hasNext = false;
D
dapan1121 已提交
2839 2840 2841 2842 2843
    int32_t code = filesetIteratorNext(&pStatus->fileIter, pReader, &hasNext);
    if (code) {
      taosArrayDestroy(pIndexList);
      return code;
    }
dengyihao's avatar
dengyihao 已提交
2844

2845
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2846 2847 2848
      break;
    }

H
Haojun Liao 已提交
2849
    taosArrayClear(pIndexList);
D
dapan1121 已提交
2850
    code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
H
Haojun Liao 已提交
2851
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2852
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2853 2854 2855
      return code;
    }

H
Hongze Cheng 已提交
2856
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2857
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2858
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2859
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2860 2861 2862
        return code;
      }

2863
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2864 2865 2866
        break;
      }
    }
2867

H
Haojun Liao 已提交
2868 2869 2870
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2871
  taosArrayDestroy(pIndexList);
2872

H
Haojun Liao 已提交
2873 2874 2875 2876 2877 2878 2879
  if (pReader->pReadSnap != NULL) {
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2880

H
Haojun Liao 已提交
2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2892 2893 2894
    }
  }

H
Haojun Liao 已提交
2895 2896 2897
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2898
static void resetTableListIndex(SReaderStatus* pStatus) {
2899
  STableUidList* pList = &pStatus->uidList;
2900

H
Haojun Liao 已提交
2901 2902 2903
  pList->currentIndex = 0;
  uint64_t uid = pList->tableUidList[0];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2904 2905
}

2906
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
2907 2908 2909 2910 2911 2912 2913 2914
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2915
  return (pStatus->pTableIter != NULL);
2916 2917
}

2918
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2919
  SReaderStatus*    pStatus = &pReader->status;
2920
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
2921
  STableUidList*    pUidList = &pStatus->uidList;
2922

H
Haojun Liao 已提交
2923 2924
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
    return TSDB_CODE_SUCCESS;
2925
  }
2926

2927 2928
  SSDataBlock* pResBlock = pReader->pResBlock;

2929
  while (1) {
2930
    // load the last data block of current table
H
Hongze Cheng 已提交
2931
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2932 2933

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2934
    if (!hasVal) {
2935
      bool hasNexTable = moveToNextTable(pUidList, pStatus);
2936
      if (!hasNexTable) {
2937 2938
        return TSDB_CODE_SUCCESS;
      }
2939

2940
      continue;
2941 2942
    }

2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2956 2957
    }

2958 2959 2960 2961 2962 2963 2964 2965
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2966 2967
      return TSDB_CODE_SUCCESS;
    }
2968

2969
    // current table is exhausted, let's try next table
2970
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
2971
    if (!hasNexTable) {
2972 2973
      return TSDB_CODE_SUCCESS;
    }
2974 2975 2976
  }
}

2977
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2978 2979
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2980 2981 2982

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2983 2984 2985
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2986

H
Haojun Liao 已提交
2987
  pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2988
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2989
    return terrno;
H
Haojun Liao 已提交
2990 2991
  }

2992
  pBlock = getCurrentBlock(pBlockIter);
2993

2994
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
C
Cary Xu 已提交
2995
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2996

2997
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2998
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2999 3000
    if (code != TSDB_CODE_SUCCESS) {
      return code;
3001 3002 3003
    }

    // build composed data block
3004
    code = buildComposedDataBlock(pReader);
C
Cary Xu 已提交
3005
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
3006
    // data in memory that are earlier than current file block
3007
    // rows in buffer should be less than the file block in asc, greater than file block in desc
3008
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
3009
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
3010 3011 3012 3013
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
3014
      ASSERT(tsLast >= pBlock->maxKey.ts);
3015

3016 3017 3018 3019
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
3020
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
X
Xiaoyu Wang 已提交
3043
                  " rows:%d, elapsed time:%.2f ms %s",
3044 3045 3046
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
3047
    } else {  // whole block is required, return it directly
3048 3049
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
3050
      pInfo->id.uid = pScanInfo->uid;
3051
      pInfo->dataLoad = 0;
3052 3053 3054
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
3055

3056
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
3057
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
X
Xiaoyu Wang 已提交
3058 3059
      tsdbDebug("%p uid:%" PRIu64
                " clean file block retrieved from file, global index:%d, "
H
Haojun Liao 已提交
3060 3061 3062
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
3063
    }
3064 3065 3066 3067 3068
  }

  return code;
}

H
Haojun Liao 已提交
3069
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
3070
  SReaderStatus* pStatus = &pReader->status;
3071
  STableUidList* pUidList = &pStatus->uidList;
3072

3073
  while (1) {
X
Xiaoyu Wang 已提交
3074 3075 3076 3077 3078 3079
    //    if (pStatus->pTableIter == NULL) {
    //      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
    //      if (pStatus->pTableIter == NULL) {
    //        return TSDB_CODE_SUCCESS;
    //      }
    //    }
3080

3081 3082
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
3083

3084
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
3085
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
3086 3087 3088 3089
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3090
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
3091
      return TSDB_CODE_SUCCESS;
3092 3093
    }

3094 3095 3096
    // current table is exhausted, let's try next table
    bool hasNexTable = moveToNextTable(pUidList, pStatus);
    if (!hasNexTable) {
H
Haojun Liao 已提交
3097
      return TSDB_CODE_SUCCESS;
3098 3099 3100 3101
    }
  }
}

3102
// set the correct start position in case of the first/last file block, according to the query time window
3103
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3104 3105 3106 3107 3108 3109 3110 3111
  int64_t             lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (pScanInfo) {
      lastKey = pScanInfo->lastKey;
    }
3112
  }
3113 3114 3115
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
3116 3117 3118

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3119
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3120
  pDumpInfo->lastKey = lastKey;
3121 3122
}

3123
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3124
  SBlockNumber num = {0};
X
Xiaoyu Wang 已提交
3125
  int32_t      code = moveToNextFile(pReader, &num);
3126 3127 3128 3129 3130
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3131
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3132 3133 3134 3135 3136
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3137 3138
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3139
  } else {  // no block data, only last block exists
3140
    tBlockDataReset(&pReader->status.fileBlockData);
3141
    resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3142
    resetTableListIndex(&pReader->status);
3143
  }
3144 3145

  // set the correct start position according to the query time window
3146
  initBlockDumpInfo(pReader, pBlockIter);
3147 3148 3149
  return code;
}

3150
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3151 3152
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3153 3154
}

3155
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3156
  int32_t code = TSDB_CODE_SUCCESS;
3157 3158
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3159 3160
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3161
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
3162
  _begin:
3163 3164 3165 3166 3167
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3168 3169 3170 3171
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3172
    // all data blocks are checked in this last block file, now let's try the next file
3173 3174 3175 3176 3177 3178 3179 3180
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3181
      // this file does not have data files, let's start check the last block file if exists
3182
      if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3183
        resetTableListIndex(&pReader->status);
3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3198
  while (1) {
3199 3200
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3201
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3202
      code = buildComposedDataBlock(pReader);
3203 3204 3205 3206
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3207
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3208 3209
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3210
        } else {
H
Haojun Liao 已提交
3211
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3212
            // data blocks in current file are exhausted, let's try the next file now
H
Haojun Liao 已提交
3213 3214 3215 3216 3217 3218
            SBlockData* pBlockData = &pReader->status.fileBlockData;
            if (pBlockData->uid != 0) {
              tBlockDataClear(pBlockData);
            }

            tBlockDataReset(pBlockData);
3219
            resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
3220
            resetTableListIndex(&pReader->status);
3221 3222 3223
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3224

3225 3226 3227 3228
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3229

3230 3231
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3232
              resetTableListIndex(&pReader->status);
3233 3234
              goto _begin;
            }
3235
          }
3236
        }
H
Haojun Liao 已提交
3237
      }
3238 3239

      code = doBuildDataBlock(pReader);
3240 3241
    }

3242 3243 3244 3245 3246 3247 3248 3249
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3250
}
H
refact  
Hongze Cheng 已提交
3251

3252 3253
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3254
  if (VND_IS_RSMA(pVnode)) {
3255
    int8_t  level = 0;
3256 3257
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3258 3259 3260
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3261

3262
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3263 3264 3265 3266 3267 3268 3269
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3270
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3271 3272 3273 3274 3275
        break;
      }
      ++level;
    }

3276
    const char* str = (idStr != NULL) ? idStr : "";
3277 3278

    if (level == TSDB_RETENTION_L0) {
3279
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3280
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3281 3282
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3283
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3284
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3285 3286
      return VND_RSMA1(pVnode);
    } else {
3287
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3288
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3289 3290 3291 3292 3293 3294 3295
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3296
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3297
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3298 3299

  int64_t endVer = 0;
L
Liu Jicong 已提交
3300 3301
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3302 3303
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3304
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3305 3306
  }

H
Haojun Liao 已提交
3307
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3308 3309
}

3310
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3311 3312 3313
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3314

L
Liu Jicong 已提交
3315 3316 3317
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3318

3319 3320 3321 3322 3323 3324
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3325
        return false;
3326 3327
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
3328 3329
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
                prev->version >= pVerRange->minVer);
3330 3331
      }
    } else {
3332 3333 3334 3335 3336 3337 3338
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3339 3340
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3356 3357
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3358 3359 3360 3361 3362 3363
            return true;
          }
        }
      }

      return false;
3364 3365
    }
  } else {
3366 3367
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3368

3369 3370 3371 3372 3373 3374 3375
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3376
    } else {
3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3404 3405 3406 3407 3408
      }

      return false;
    }
  }
3409 3410

  return false;
3411 3412
}

3413
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3414
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3415 3416
    return NULL;
  }
H
Hongze Cheng 已提交
3417

3418
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3419 3420
  TSDBKEY  key = TSDBROW_KEY(pRow);

3421
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3422
    pIter->hasVal = false;
H
Haojun Liao 已提交
3423 3424
    return NULL;
  }
H
Hongze Cheng 已提交
3425

3426
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3427
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3428
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3429 3430
    return pRow;
  }
H
Hongze Cheng 已提交
3431

3432
  while (1) {
3433 3434
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3435 3436
      return NULL;
    }
H
Hongze Cheng 已提交
3437

3438
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3439

H
Haojun Liao 已提交
3440
    key = TSDBROW_KEY(pRow);
3441
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3442
      pIter->hasVal = false;
H
Haojun Liao 已提交
3443 3444
      return NULL;
    }
H
Hongze Cheng 已提交
3445

dengyihao's avatar
dengyihao 已提交
3446
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3447
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3448 3449 3450 3451
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3452

3453 3454
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3455
  while (1) {
3456 3457
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3458 3459
      break;
    }
H
Hongze Cheng 已提交
3460

3461
    // data exists but not valid
3462
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3463 3464 3465 3466 3467
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3468
    TSDBKEY k = TSDBROW_KEY(pRow);
3469
    if (k.ts != ts) {
H
Haojun Liao 已提交
3470 3471 3472
      break;
    }

3473 3474 3475 3476 3477
    if (pRow->type == TSDBROW_ROW_FMT) {
      STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
      if (pTSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
3478

3479 3480 3481 3482
      tsdbRowMergerAdd(pMerger, pRow, pTSchema);
    } else {  // column format
      tsdbRowMerge(pMerger, pRow);
    }
H
Haojun Liao 已提交
3483 3484 3485 3486 3487
  }

  return TSDB_CODE_SUCCESS;
}

3488
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3489
                                          SVersionRange* pVerRange, int32_t step) {
3490
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3491
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3492
      rowIndex += step;
3493 3494 3495 3496
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
H
Hongze Cheng 已提交
3497
    tsdbRowMerge(pMerger, &fRow);
3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3509
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3510 3511
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3512
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3513
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3514
  bool                asc = ASCENDING_TRAVERSE(pReader->order);
3515

3516
  *state = CHECK_FILEBLOCK_QUIT;
3517
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3518

3519
  bool    loadNeighbor = true;
H
Haojun Liao 已提交
3520
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3521

H
Haojun Liao 已提交
3522
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3523 3524
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
3525
    if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
3526 3527 3528 3529
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3530
  return code;
3531 3532
}

3533 3534
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3535 3536
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3537
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3538
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3539
  int32_t step = asc ? 1 : -1;
3540

3541
  pDumpInfo->rowIndex += step;
3542
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3543 3544 3545
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3546

3547 3548 3549 3550
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3551

3552
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3553
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
H
Haojun Liao 已提交
3554 3555 3556 3557 3558
      if (pFileBlockInfo == NULL) {
        st = CHECK_FILEBLOCK_QUIT;
        break;
      }

3559 3560 3561
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3562
      }
3563
    }
H
Haojun Liao 已提交
3564
  }
3565

H
Haojun Liao 已提交
3566 3567 3568
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3569
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3570
                               SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
3571
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3572 3573
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3574
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
3575
      tsdbRowMerge(pMerger, &fRow1);
3576
    } else {
3577 3578 3579
      tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
                pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
                idStr);
3580 3581 3582 3583 3584 3585 3586
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3587
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
3588
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3589
  TSDBROW* pNextRow = NULL;
3590
  TSDBROW  current = *pRow;
3591

3592 3593
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3594

3595
    if (!pIter->hasVal) {
3596
      *pResRow = *pRow;
3597
      *freeTSRow = false;
3598
      return TSDB_CODE_SUCCESS;
3599
    } else {  // has next point in mem/imem
3600
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3601
      if (pNextRow == NULL) {
H
Haojun Liao 已提交
3602
        *pResRow = current;
3603
        *freeTSRow = false;
3604
        return TSDB_CODE_SUCCESS;
3605 3606
      }

H
Hongze Cheng 已提交
3607
      if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
H
Haojun Liao 已提交
3608
        *pResRow = current;
3609
        *freeTSRow = false;
3610
        return TSDB_CODE_SUCCESS;
3611
      }
3612
    }
3613 3614
  }

3615
  SRowMerger merge = {0};
H
Haojun Liao 已提交
3616
  terrno = 0;
3617
  int32_t code = 0;
H
Haojun Liao 已提交
3618

3619 3620 3621 3622 3623 3624 3625
  // start to merge duplicated rows
  if (current.type == TSDBROW_ROW_FMT) {
    // get the correct schema for data in memory
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
    if (pTSchema == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3626

3627 3628 3629
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
H
Haojun Liao 已提交
3630

3631
    code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
3632 3633 3634
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3635

3636 3637 3638 3639
    STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
    if (pTSchema1 == NULL) {
      return terrno;
    }
H
Haojun Liao 已提交
3640

3641
    tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
3642 3643
  } else {  // let's merge rows in file block
    code = tsdbRowMergerInit(&merge, &current, pReader->pSchema);
3644 3645 3646
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3647

3648 3649
    tsdbRowMerge(&merge, pNextRow);
  }
H
Haojun Liao 已提交
3650

wmmhello's avatar
wmmhello 已提交
3651
  code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
H
Haojun Liao 已提交
3652 3653 3654 3655
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3656
  code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
3657 3658 3659
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3660

wmmhello's avatar
wmmhello 已提交
3661
  pResRow->type = TSDBROW_ROW_FMT;
3662
  tsdbRowMergerClear(&merge);
3663
  *freeTSRow = true;
3664

3665
  return TSDB_CODE_SUCCESS;
3666 3667
}

3668
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3669
                           SRow** pTSRow) {
H
Haojun Liao 已提交
3670 3671
  SRowMerger merge = {0};

3672 3673 3674
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3675
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3676
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3677

H
Hongze Cheng 已提交
3678
    int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
3679 3680 3681 3682 3683 3684 3685 3686 3687
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3688

H
Hongze Cheng 已提交
3689
    tsdbRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3690 3691 3692 3693 3694 3695
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3696
  } else {
H
Haojun Liao 已提交
3697
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3698

H
Hongze Cheng 已提交
3699
    int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
3700
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3701 3702 3703 3704 3705 3706 3707 3708
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3709

H
Hongze Cheng 已提交
3710
    tsdbRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3711 3712 3713 3714 3715
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3716
  }
3717

H
Haojun Liao 已提交
3718 3719
  int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
  tsdbRowMergerClear(&merge);
3720
  return code;
3721 3722
}

3723
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
3724
                            bool* freeTSRow) {
3725 3726
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3727
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3728
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3729

3730 3731
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3732
  if (pBlockScanInfo->iter.hasVal) {
3733 3734 3735 3736 3737 3738
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3739
  if (pBlockScanInfo->iiter.hasVal) {
3740 3741 3742 3743 3744 3745
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3746
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3747
    TSDBKEY k = TSDBROW_KEY(pRow);
3748
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3749

3750
    int32_t code = TSDB_CODE_SUCCESS;
3751 3752
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3753
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
3754
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3755
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
3756
      }
3757
    } else {  // ik.ts == k.ts
3758
      *freeTSRow = true;
3759 3760
      pResRow->type = TSDBROW_ROW_FMT;
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
3761 3762 3763
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3764
    }
3765

3766
    return code;
H
Haojun Liao 已提交
3767 3768
  }

3769
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3770
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
H
Hongze Cheng 已提交
3771
                                    freeTSRow);
H
Haojun Liao 已提交
3772 3773
  }

3774
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3775
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3776 3777 3778 3779 3780
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3781
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3782
  int32_t outputRowIndex = pBlock->info.rows;
3783
  int64_t uid = pScanInfo->uid;
3784 3785 3786

  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3787
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3788
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3789

3790
  SColVal colVal = {0};
3791
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3792

3793
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3794
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3795
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3796 3797 3798
    i += 1;
  }

H
Haojun Liao 已提交
3799
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3800
    col_id_t colId = pSupInfo->colId[i];
3801 3802

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3803
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3804

H
Hongze Cheng 已提交
3805
      tRowGet(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3806
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3807 3808 3809
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3810
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3811

3812
      colDataSetNULL(pColInfoData, outputRowIndex);
3813 3814 3815
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3816
    }
3817 3818
  }

3819
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3820
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3821
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
3822
    colDataSetNULL(pColInfoData, outputRowIndex);
3823 3824 3825
    i += 1;
  }

3826
  pBlock->info.dataLoad = 1;
3827
  pBlock->info.rows += 1;
3828
  pScanInfo->lastKey = pTSRow->ts;
3829 3830 3831
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3832 3833
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3834 3835 3836 3837
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3838
  if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
3839
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3840
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3841
    i += 1;
3842 3843 3844
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3845
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3846
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3847

3848
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3849
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3850
    if (pData->cid < pSupInfo->colId[i]) {
3851 3852 3853 3854
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3855 3856
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3857 3858
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3859
      j += 1;
H
Haojun Liao 已提交
3860 3861
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3862
      colDataSetNULL(pCol, outputRowIndex);
3863 3864 3865 3866 3867 3868
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3869
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3870
    colDataSetNULL(pCol, outputRowIndex);
3871 3872 3873
    i += 1;
  }

3874
  pResBlock->info.dataLoad = 1;
3875 3876 3877 3878
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3879 3880
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3881 3882 3883
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
3884
    //    SRow* pTSRow = NULL;
3885
    TSDBROW row = {.type = -1};
3886
    bool    freeTSRow = false;
3887 3888
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
    if (row.type == -1) {
3889
      break;
H
Haojun Liao 已提交
3890 3891
    }

3892 3893
    if (row.type == TSDBROW_ROW_FMT) {
      doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
3894

3895 3896 3897 3898 3899
      if (freeTSRow) {
        taosMemoryFree(row.pTSRow);
      }
    } else {
      doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
3900
    }
H
Haojun Liao 已提交
3901 3902

    // no data in buffer, return immediately
3903
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3904 3905 3906
      break;
    }

3907
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3908 3909 3910 3911 3912 3913
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3914

3915 3916
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3917
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3918

3919
  STableBlockScanInfo** p = NULL;
3920
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3921
    clearBlockScanInfo(*p);
3922 3923
  }

D
dapan1121 已提交
3924 3925 3926 3927 3928
  if (size < num) {
    int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
    if (code) {
      return code;
    }
3929 3930 3931 3932 3933 3934 3935

    char* p1 = taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
    if (p1 == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    pReader->status.uidList.tableUidList = (uint64_t*)p1;
D
dapan1121 已提交
3936
  }
3937

3938
  taosHashClear(pReader->status.pTableMap);
3939
  STableUidList* pUidList = &pReader->status.uidList;
H
Haojun Liao 已提交
3940
  pUidList->currentIndex = 0;
3941

3942 3943
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3944 3945
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
H
Haojun Liao 已提交
3946 3947
    pUidList->tableUidList[i] = pList[i].uid;

3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958
    // todo extract method
    if (ASCENDING_TRAVERSE(pReader->order)) {
      int64_t skey = pReader->window.skey;
      pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
      pInfo->lastKeyInStt = skey;
    } else {
      int64_t ekey = pReader->window.ekey;
      pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
      pInfo->lastKeyInStt = ekey;
    }

3959
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3960 3961
  }

H
Hongze Cheng 已提交
3962 3963 3964
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3965 3966 3967 3968 3969 3970
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3971

dengyihao's avatar
dengyihao 已提交
3972 3973 3974 3975 3976 3977
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3978

H
Hongze Cheng 已提交
3979
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3980

3981
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
3982 3983
  SReaderStatus*  pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
3984

3985 3986
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pStatus->blockIter, pReader->order);
3987

3988 3989 3990
  int32_t code = TSDB_CODE_SUCCESS;
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
3991
  } else {
3992
    code = initForFirstBlockInFile(pReader, pBlockIter);
3993
  }
3994 3995 3996

  if (!pStatus->loadFromFile) {
    resetTableListIndex(pStatus);
3997
  }
3998 3999

  return code;
4000 4001
}

4002 4003 4004 4005 4006
static void freeSchemaFunc(void* param) {
  void* p = *(void**) param;
  taosMemoryFree(p);
}

H
refact  
Hongze Cheng 已提交
4007
// ====================================== EXPOSED APIs ======================================
4008
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
4009
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
4010 4011 4012 4013 4014 4015
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

4016 4017 4018
  int32_t capacity = pVnode->config.tsdbCfg.maxRows;
  if (pResBlock != NULL) {
    blockDataEnsureCapacity(pResBlock, capacity);
H
Haojun Liao 已提交
4019 4020 4021
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
4022
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
4023 4024
    goto _err;
  }
H
Hongze Cheng 已提交
4025

4026
  // check for query time window
H
Haojun Liao 已提交
4027
  STsdbReader* pReader = *ppReader;
4028
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
4029 4030 4031
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4032

4033 4034
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
4035
    int32_t order = pCond->order;
4036
    if (order == TSDB_ORDER_ASC) {
4037
      pCond->twindows.ekey = window.skey;
4038 4039 4040
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
4041
      pCond->twindows.skey = window.ekey;
4042 4043 4044 4045
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

4046
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
4047
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
4048 4049 4050 4051 4052
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
4053
      pCond->twindows.skey = window.ekey;
4054
      pCond->twindows.ekey = INT64_MAX;
4055
    } else {
4056
      pCond->twindows.skey = INT64_MIN;
4057
      pCond->twindows.ekey = window.ekey;
4058
    }
4059 4060
    pCond->order = order;

H
Haojun Liao 已提交
4061
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
4062 4063 4064 4065 4066
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
4067
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
4068 4069
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
4070
  if (pCond->suid != 0) {
4071
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
4072
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4073
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
4074
    }
4075 4076
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
4077
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
4078
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
4079
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
4080
    }
4081 4082
  }

4083 4084 4085 4086 4087 4088 4089 4090
  pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
  if (pReader->pSchemaMap == NULL) {
    tsdbError("failed init schema hash for reader", pReader->idStr);
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
4091
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
4092 4093 4094 4095
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
4096
  }
4097

4098
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
X
Xiaoyu Wang 已提交
4099 4100
  pReader->status.pTableMap =
      createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables);
H
Haojun Liao 已提交
4101 4102
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
4103
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
4104 4105
    goto _err;
  }
H
Hongze Cheng 已提交
4106

4107
  pReader->suspended = true;
4108

4109
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
4110
  return code;
H
Hongze Cheng 已提交
4111 4112

_err:
H
Haojun Liao 已提交
4113
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
4114
  tsdbReaderClose(pReader);
X
Xiaoyu Wang 已提交
4115
  *ppReader = NULL;  // reset the pointer value.
H
Hongze Cheng 已提交
4116
  return code;
H
refact  
Hongze Cheng 已提交
4117 4118 4119
}

void tsdbReaderClose(STsdbReader* pReader) {
4120 4121
  if (pReader == NULL) {
    return;
4122
  }
H
refact  
Hongze Cheng 已提交
4123

4124
  tsdbAcquireReader(pReader);
4125
  {
H
Haojun Liao 已提交
4126
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
4127
      STsdbReader* p = pReader->innerReader[0];
4128

4129
      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4130
      p->status.uidList.tableUidList = NULL;
4131 4132
      p->pReadSnap = NULL;
      p->pSchema = NULL;
4133
      p->pSchemaMap = NULL;
4134 4135 4136 4137

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
H
Haojun Liao 已提交
4138
      p->status.uidList.tableUidList = NULL;
4139 4140
      p->pReadSnap = NULL;
      p->pSchema = NULL;
4141
      p->pSchemaMap = NULL;
4142 4143 4144 4145 4146 4147

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

4148
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
4149

4150
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
4151
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
4152 4153 4154 4155
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
4156

H
Haojun Liao 已提交
4157 4158 4159
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
4160

H
Haojun Liao 已提交
4161
  taosMemoryFree(pSupInfo->colId);
H
Hongze Cheng 已提交
4162
  tBlockDataDestroy(&pReader->status.fileBlockData);
4163
  cleanupDataBlockIterator(&pReader->status.blockIter);
4164 4165

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
4166 4167 4168 4169
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }
4170

H
Haojun Liao 已提交
4171 4172 4173
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4174

4175 4176 4177 4178 4179 4180 4181 4182 4183
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

4184
  qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
4185
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
4186
  pReader->pReadSnap = NULL;
4187

4188 4189
  tsdbReleaseReader(pReader);

4190
  tsdbUninitReaderLock(pReader);
4191

4192
  taosMemoryFree(pReader->status.uidList.tableUidList);
H
Haojun Liao 已提交
4193
  SIOCostSummary* pCost = &pReader->cost;
4194

H
Haojun Liao 已提交
4195 4196
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4197 4198
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4199

H
Haojun Liao 已提交
4200
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
H
refact  
Hongze Cheng 已提交
4201

H
Haojun Liao 已提交
4202 4203 4204
    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4205

4206 4207 4208 4209 4210
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
X
Xiaoyu Wang 已提交
4211 4212
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,initDelSkylineIterTime:%.2f "
      "ms, %s",
4213 4214 4215
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
H
Haojun Liao 已提交
4216
      pCost->initDelSkylineIterTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4217

4218 4219
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4220

4221
  tSimpleHashCleanup(pReader->pSchemaMap);
4222
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4223 4224
}

4225 4226 4227 4228 4229 4230 4231 4232 4233 4234
int32_t tsdbReaderSuspend(STsdbReader* pReader) {
  int32_t code = 0;

  // save reader's base state & reset top state to be reconstructed from base state
  SReaderStatus*       pStatus = &pReader->status;
  STableBlockScanInfo* pBlockScanInfo = NULL;

  if (pStatus->loadFromFile) {
    SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
    if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
4235
      pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
4236 4237 4238 4239
      if (pBlockScanInfo == NULL) {
        goto _err;
      }
    } else {
4240
      pBlockScanInfo = *pStatus->pTableIter;
4241 4242 4243 4244 4245
    }

    tsdbDataFReaderClose(&pReader->pFileReader);

    // resetDataBlockScanInfo excluding lastKey
4246
    STableBlockScanInfo** p = NULL;
4247 4248

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
4249 4250 4251 4252 4253 4254 4255 4256
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
4257 4258
      }

4259 4260 4261 4262 4263 4264
      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
4265 4266
    }
  } else {
4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288
    // resetDataBlockScanInfo excluding lastKey
    STableBlockScanInfo** p = NULL;

    while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
      STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;

      pInfo->iterInit = false;
      pInfo->iter.hasVal = false;
      pInfo->iiter.hasVal = false;

      if (pInfo->iter.iter != NULL) {
        pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
      }

      if (pInfo->iiter.iter != NULL) {
        pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
      }

      pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
      // pInfo->lastKey = ts;
    }

4289
    pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
4290 4291 4292 4293 4294 4295 4296
    if (pBlockScanInfo) {
      // save lastKey to restore memory iterator
      STimeWindow w = pReader->pResBlock->info.window;
      pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;

      // reset current current table's data block scan info,
      pBlockScanInfo->iterInit = false;
4297 4298
      pBlockScanInfo->iter.hasVal = false;
      pBlockScanInfo->iiter.hasVal = false;
4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313
      if (pBlockScanInfo->iter.iter != NULL) {
        pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
      }

      if (pBlockScanInfo->iiter.iter != NULL) {
        pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
      }

      pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
      tMapDataClear(&pBlockScanInfo->mapData);
      // TODO: keep skyline for reuse
      pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
    }
  }

4314
  tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
4315
  pReader->pReadSnap = NULL;
4316 4317 4318

  pReader->suspended = true;

4319 4320
  tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
            pReader->idStr);
4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331
  return code;

_err:
  tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

static int32_t tsdbSetQueryReseek(void* pQHandle) {
  int32_t      code = 0;
  STsdbReader* pReader = pQHandle;

4332
  code = tsdbTryAcquireReader(pReader);
4333 4334
  if (code == 0) {
    if (pReader->suspended) {
4335
      tsdbReleaseReader(pReader);
4336 4337 4338 4339
      return code;
    }

    tsdbReaderSuspend(pReader);
4340

4341
    tsdbReleaseReader(pReader);
4342

4343
    return code;
4344 4345 4346
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
4347 4348
    terrno = TAOS_SYSTEM_ERROR(code);
    return TSDB_CODE_FAILED;
4349 4350 4351 4352 4353 4354
  }
}

int32_t tsdbReaderResume(STsdbReader* pReader) {
  int32_t code = 0;

4355
  STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
4356 4357 4358

  //  restore reader's state
  //  task snapshot
4359
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
4360
  if (numOfTables > 0) {
4361
    qTrace("tsdb/reader: %p, take snapshot", pReader);
4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378
    code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    } else {
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];

      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4379
      pPrevReader->status.uidList = pReader->status.uidList;
4380
      pPrevReader->pSchema = pReader->pSchema;
4381
      pPrevReader->pSchemaMap = pReader->pSchemaMap;
4382 4383 4384 4385
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
X
Xiaoyu Wang 已提交
4386
      pNextReader->status.uidList = pReader->status.uidList;
4387
      pNextReader->pSchema = pReader->pSchema;
4388
      pNextReader->pSchemaMap = pReader->pSchemaMap;
4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399
      pNextReader->pReadSnap = pReader->pReadSnap;

      code = doOpenReaderImpl(pPrevReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

  pReader->suspended = false;

4400 4401
  tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
            pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
4402 4403 4404 4405 4406 4407 4408
  return code;

_err:
  tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
  return code;
}

4409
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4410
  // cleanup the data that belongs to the previous data block
4411 4412
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4413

4414
  SReaderStatus* pStatus = &pReader->status;
4415
  if (taosHashGetSize(pStatus->pTableMap) == 0) {
4416 4417
    return false;
  }
H
Haojun Liao 已提交
4418

4419 4420 4421 4422 4423
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4424

4425 4426 4427
    if (pBlock->info.rows > 0) {
      return true;
    } else {
4428
      resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4429
      buildBlockFromBufferSequentially(pReader);
4430
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4431
    }
4432 4433 4434
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4435
  }
H
refact  
Hongze Cheng 已提交
4436 4437
}

4438
bool tsdbNextDataBlock(STsdbReader* pReader) {
4439
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
4440 4441 4442
    return false;
  }

4443 4444
  SReaderStatus* pStatus = &pReader->status;

4445 4446 4447
  int32_t code = tsdbAcquireReader(pReader);
  qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

4448 4449 4450 4451
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4452
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4453
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4454
    pReader->step = EXTERNAL_ROWS_PREV;
4455
    if (ret) {
4456
      pStatus = &pReader->innerReader[0]->status;
4457
      if (pStatus->composedDataBlock) {
4458
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4459
        tsdbReleaseReader(pReader);
4460 4461
      }

4462 4463
      return ret;
    }
4464
  }
4465

4466
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4467
    // prepare for the main scan
4468 4469 4470
    code = doOpenReaderImpl(pReader);
    int32_t step = 1;
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step);
4471 4472 4473 4474 4475

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4476
    pReader->step = EXTERNAL_ROWS_MAIN;
4477 4478 4479 4480
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
4481
    if (pStatus->composedDataBlock) {
4482
      qTrace("tsdb/read: %p, unlock read mutex", pReader);
4483
      tsdbReleaseReader(pReader);
4484 4485
    }

4486 4487 4488
    return ret;
  }

4489
  if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
4490
    // prepare for the next row scan
4491 4492 4493
    int32_t step = -1;
    code = doOpenReaderImpl(pReader->innerReader[1]);
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step);
4494 4495 4496 4497
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4498
    ret = doTsdbNextDataBlock(pReader->innerReader[1]);
4499
    pReader->step = EXTERNAL_ROWS_NEXT;
4500
    if (ret) {
4501
      pStatus = &pReader->innerReader[1]->status;
4502
      if (pStatus->composedDataBlock) {
4503
        qTrace("tsdb/read: %p, unlock read mutex", pReader);
4504
        tsdbReleaseReader(pReader);
4505 4506
      }

4507
      return ret;
4508 4509 4510
    }
  }

4511
  qTrace("tsdb/read: %p, unlock read mutex", pReader);
4512
  tsdbReleaseReader(pReader);
4513

4514 4515 4516
  return false;
}

4517
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
4518 4519
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
4520
  int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
4521
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4522
  size++;
4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4534
        taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
4535
        i += 1;
D
dapan1121 已提交
4536
        size++;
4537 4538 4539 4540
      }
      j += 1;
    }
  }
4541 4542 4543 4544 4545 4546 4547 4548 4549

  while (j < numOfCols) {
    if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
      SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
      taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
      i += 1;
    }
    j++;
  }
4550 4551
}

4552
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4553 4554
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4555
  int32_t code = 0;
4556
  *allHave = false;
H
Haojun Liao 已提交
4557
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4558

4559
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4560 4561 4562
    return TSDB_CODE_SUCCESS;
  }

4563
  // there is no statistics data for composed block
4564
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4565 4566
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4567

4568
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4569 4570
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4571 4572 4573
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4574

D
dapan1121 已提交
4575 4576
  int64_t st = taosGetTimestampUs();

4577
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4578
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4579
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4580
    if (code != TSDB_CODE_SUCCESS) {
4581 4582
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4583 4584
      return code;
    }
4585
  } else {
H
Haojun Liao 已提交
4586
    *pBlockSMA = NULL;
4587
    return TSDB_CODE_SUCCESS;
4588
  }
H
Hongze Cheng 已提交
4589

4590
  *allHave = true;
H
Hongze Cheng 已提交
4591

4592 4593
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4594

4595 4596
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4597 4598 4599 4600
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4601
  size_t numOfCols = pSup->numOfCols;
4602

4603
  // ensure capacity
H
Haojun Liao 已提交
4604 4605 4606
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4607 4608
  }

4609 4610 4611
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
H
Haojun Liao 已提交
4612
    pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
4613
  }
4614

4615
  // do fill all null column value SMA info
4616
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4617
  size_t size = taosArrayGetSize(pSup->pColAgg);
4618

H
Haojun Liao 已提交
4619
  int32_t i = 0, j = 0;
4620
  while (j < numOfCols && i < size) {
4621
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4622 4623
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4624 4625
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4626
    } else if (pAgg->colId < pSup->colId[j]) {
4627
      i += 1;
H
Haojun Liao 已提交
4628
    } else if (pSup->colId[j] < pAgg->colId) {
4629 4630
      pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
      *allHave = false;
4631 4632 4633 4634
      j += 1;
    }
  }

H
Haojun Liao 已提交
4635
  *pBlockSMA = pResBlock->pBlockAgg;
4636
  pReader->cost.smaDataLoad += 1;
4637

D
dapan1121 已提交
4638 4639 4640
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
  pReader->cost.smaLoadTime += elapsedTime;

4641
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4642
  return code;
H
Hongze Cheng 已提交
4643 4644
}

H
Haojun Liao 已提交
4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
  STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
  if (p == NULL || *p == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    int32_t size = taosHashGetSize(pTableMap);
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
    return NULL;
  }

  return *p;
}

H
Haojun Liao 已提交
4657
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
4658
  SReaderStatus*       pStatus = &pReader->status;
H
Haojun Liao 已提交
4659
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Haojun Liao 已提交
4660
  STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
4661
  if (pBlockScanInfo == NULL) {
4662
    return NULL;
4663 4664
  }

4665
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4666
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4667
    tBlockDataDestroy(&pStatus->fileBlockData);
4668 4669
    terrno = code;
    return NULL;
4670
  }
4671

4672
  copyBlockDataToSDataBlock(pReader);
H
Haojun Liao 已提交
4673
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4674 4675
}

H
Haojun Liao 已提交
4676
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4677
  STsdbReader* pTReader = pReader;
4678 4679
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
4680
      pTReader = pReader->innerReader[0];
4681
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
4682
      pTReader = pReader->innerReader[1];
4683 4684 4685
    }
  }

4686 4687 4688 4689 4690 4691 4692
  SReaderStatus* pStatus = &pTReader->status;
  if (pStatus->composedDataBlock) {
    return pTReader->pResBlock;
  }

  SSDataBlock* ret = doRetrieveDataBlock(pTReader);

4693
  qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
4694
  tsdbReleaseReader(pReader);
4695 4696

  return ret;
4697 4698
}

H
Haojun Liao 已提交
4699
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
4700
  qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
4701
  tsdbAcquireReader(pReader);
L
Liu Jicong 已提交
4702 4703 4704 4705 4706

  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

H
Haojun Liao 已提交
4707
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4708
    tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr);
4709
    tsdbReleaseReader(pReader);
4710 4711
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4712

H
Haojun Liao 已提交
4713 4714
  SReaderStatus* pStatus = &pReader->status;
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
4715

L
Liu Jicong 已提交
4716
  pReader->order = pCond->order;
4717
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
H
Haojun Liao 已提交
4718 4719
  pStatus->loadFromFile = true;
  pStatus->pTableIter = NULL;
H
Haojun Liao 已提交
4720
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4721

4722
  // allocate buffer in order to load data blocks from file
4723
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4724

4725
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4726
  tsdbDataFReaderClose(&pReader->pFileReader);
4727

H
Haojun Liao 已提交
4728
  int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
L
Liu Jicong 已提交
4729

H
Haojun Liao 已提交
4730
  initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4731
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4732
  resetTableListIndex(&pReader->status);
H
Haojun Liao 已提交
4733

4734 4735 4736 4737
  bool asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc? 1:-1;
  int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1;
  resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
4738

4739
  int32_t code = 0;
4740

4741
  // no data in files, let's try buffer in memory
H
Haojun Liao 已提交
4742 4743
  if (pStatus->fileIter.numOfFiles == 0) {
    pStatus->loadFromFile = false;
4744
    resetTableListIndex(pStatus);
4745 4746 4747
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4748 4749
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4750

4751
      tsdbReleaseReader(pReader);
4752 4753 4754
      return code;
    }
  }
H
Hongze Cheng 已提交
4755

H
Hongze Cheng 已提交
4756 4757 4758 4759
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4760

4761
  tsdbReleaseReader(pReader);
4762

4763
  return code;
H
Hongze Cheng 已提交
4764
}
H
Hongze Cheng 已提交
4765

4766 4767 4768 4769 4770 4771
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows, int32_t numOfBucket) {
  int32_t bucketIndex = (numOfRows - startRow) / bucketRange;
  if (bucketIndex == numOfBucket) {
    bucketIndex -= 1;
  }
  return bucketIndex;
4772
}
H
Hongze Cheng 已提交
4773

4774 4775 4776 4777
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
4778
  pTableBlockInfo->numOfVgroups = 1;
H
Hongze Cheng 已提交
4779

4780
  const int32_t numOfBucket = 20.0;
dengyihao's avatar
dengyihao 已提交
4781

4782
  // find the start data block in file
dengyihao's avatar
dengyihao 已提交
4783 4784 4785 4786
  tsdbAcquireReader(pReader);
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }
4787
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4788

4789 4790 4791
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4792

H
Haojun Liao 已提交
4793
  int32_t bucketRange = ceil(((double)(pc->maxRows - pc->minRows)) / numOfBucket);
H
Hongze Cheng 已提交
4794

4795
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4796

4797 4798
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4799

4800 4801
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4802

4803 4804
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4805
  }
H
Hongze Cheng 已提交
4806

4807
  pTableBlockInfo->numOfTables = numOfTables;
4808
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4809

4810 4811
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4812
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4813

4814 4815
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4816

4817 4818 4819
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4820

4821 4822 4823
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4824

4825 4826 4827
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4828

4829
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4830

4831
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBucket);
4832
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4833

H
Haojun Liao 已提交
4834
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4835 4836
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
H
Haojun Liao 已提交
4837
      if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
4838 4839
        break;
      }
H
refact  
Hongze Cheng 已提交
4840

4841 4842
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4843
    }
H
refact  
Hongze Cheng 已提交
4844

H
Hongze Cheng 已提交
4845 4846
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4847
  }
dengyihao's avatar
dengyihao 已提交
4848
  tsdbReleaseReader(pReader);
H
refact  
Hongze Cheng 已提交
4849 4850
  return code;
}
H
Hongze Cheng 已提交
4851

H
refact  
Hongze Cheng 已提交
4852
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4853
  int64_t rows = 0;
H
Hongze Cheng 已提交
4854

4855
  SReaderStatus* pStatus = &pReader->status;
4856
  tsdbAcquireReader(pReader);
4857 4858 4859 4860
  if (pReader->suspended) {
    tsdbReaderResume(pReader);
  }

4861
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4862

4863
  while (pStatus->pTableIter != NULL) {
4864
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4865 4866

    STbData* d = NULL;
4867
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
4868
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4869 4870 4871 4872 4873 4874
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
4875
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
4876
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4877 4878 4879 4880 4881 4882 4883 4884
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4885

4886
  tsdbReleaseReader(pReader);
4887

H
refact  
Hongze Cheng 已提交
4888
  return rows;
H
Hongze Cheng 已提交
4889
}
D
dapan1121 已提交
4890

L
Liu Jicong 已提交
4891
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4892 4893 4894 4895
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4896
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4897 4898 4899 4900 4901 4902 4903
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4904

D
dapan1121 已提交
4905
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4906
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4907
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4908
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4909 4910 4911 4912 4913 4914
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4915
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4916
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4917 4918 4919 4920
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4921 4922 4923
  }

  metaReaderClear(&mr);
4924
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4925

D
dapan1121 已提交
4926 4927
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4928

H
Hongze Cheng 已提交
4929
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
H
Hongze Cheng 已提交
4930 4931 4932
  int32_t        code = 0;
  STsdb*         pTsdb = pReader->pTsdb;
  SVersionRange* pRange = &pReader->verRange;
H
Hongze Cheng 已提交
4933 4934

  // alloc
H
Hongze Cheng 已提交
4935 4936
  STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(*pSnap));
  if (pSnap == NULL) {
H
Hongze Cheng 已提交
4937 4938 4939 4940 4941
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
H
Hongze Cheng 已提交
4942
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4943 4944

  // take snapshot
H
Hongze Cheng 已提交
4945
  if (pTsdb->mem && (pRange->minVer <= pTsdb->mem->maxVer && pRange->maxVer >= pTsdb->mem->minVer)) {
H
Hongze Cheng 已提交
4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956
    pSnap->pMem = pTsdb->mem;
    pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
    if (pSnap->pNode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pNode->pQHandle = pReader;
    pSnap->pNode->reseek = reseek;

    tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
H
Hongze Cheng 已提交
4957 4958
  }

H
Hongze Cheng 已提交
4959
  if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
H
Hongze Cheng 已提交
4960 4961 4962 4963 4964 4965 4966 4967 4968 4969 4970
    pSnap->pIMem = pTsdb->imem;
    pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
    if (pSnap->pINode == NULL) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
    pSnap->pINode->pQHandle = pReader;
    pSnap->pINode->reseek = reseek;

    tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
H
Hongze Cheng 已提交
4971 4972
  }

H
Hongze Cheng 已提交
4973
  // fs
H
Hongze Cheng 已提交
4974
  code = tsdbFSRef(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4975 4976 4977 4978
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4979 4980

  // unlock
H
Hongze Cheng 已提交
4981
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
4982

4983
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
4984

H
Hongze Cheng 已提交
4985
_exit:
H
Hongze Cheng 已提交
4986 4987 4988 4989 4990 4991 4992 4993 4994 4995
  if (code) {
    *ppSnap = NULL;
    if (pSnap) {
      if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
      if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
      taosMemoryFree(pSnap);
    }
  } else {
    *ppSnap = pSnap;
  }
H
Hongze Cheng 已提交
4996 4997 4998
  return code;
}

4999
void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proactive) {
H
Hongze Cheng 已提交
5000 5001
  STsdb* pTsdb = pReader->pTsdb;

H
Hongze Cheng 已提交
5002 5003
  if (pSnap) {
    if (pSnap->pMem) {
5004
      tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive);
H
Hongze Cheng 已提交
5005 5006 5007
    }

    if (pSnap->pIMem) {
5008
      tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
H
Hongze Cheng 已提交
5009 5010
    }

H
Hongze Cheng 已提交
5011
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
5012 5013
    if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
    if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
H
Hongze Cheng 已提交
5014
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
5015
  }
5016
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
5017
}
5018 5019 5020 5021 5022

// if failed, do nothing
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
  taosMemoryFreeClear(pReader->idStr);
  pReader->idStr = taosStrdup(idstr);
5023
}