tsdbRead.c 151.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
H
Haojun Liao 已提交
87 88
  int16_t*         colId;
  int16_t*         slotId;
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
91
  bool             smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
92 93
} SBlockLoadSuppInfo;

94
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
95 96 97 98 99
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
100
  SSttBlockLoadInfo* pInfo;
101 102
} SLastBlockReader;

103
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
104 105 106
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
107
  int32_t           order;
H
Hongze Cheng 已提交
108
  SLastBlockReader* pLastBlockReader;  // last file block reader
109
} SFilesetIter;
H
Haojun Liao 已提交
110 111

typedef struct SFileDataBlockInfo {
112
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
113
  uint64_t uid;
114
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
115 116 117
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
118
  int32_t   numOfBlocks;
119
  int32_t   index;
H
Hongze Cheng 已提交
120
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
121
  int32_t   order;
122
  SDataBlk  block;  // current SDataBlk data
123
  SHashObj* pTableMap;
H
Haojun Liao 已提交
124 125 126
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
131 132
} SFileBlockDumpInfo;

133
typedef struct SUidOrderCheckInfo {
134 135
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
136 137
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
138
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
139 140 141
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
142
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
H
Hongze Cheng 已提交
143 144 145 146 147 148
  SUidOrderCheckInfo    uidCheckInfo;       // check all table in uid order
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
149 150
} SReaderStatus;

151
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
152 153 154
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
155 156
} SBlockInfoBuf;

H
Hongze Cheng 已提交
157
struct STsdbReader {
H
Haojun Liao 已提交
158 159 160
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
161
  bool               freeBlock;
H
Haojun Liao 已提交
162 163 164 165
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
166 167
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
168
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
169
  STsdbReadSnap*     pReadSnap;
170
  SIOCostSummary     cost;
171 172
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
173 174 175
  SDataFReader*      pFileReader; // the file reader
  SDelFReader*       pDelFReader; // the del file reader
  SArray*            pDelIdx;     // del file block index;
176
  SVersionRange      verRange;
177 178 179
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
180
};
H
Hongze Cheng 已提交
181

H
Haojun Liao 已提交
182
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
183 184
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
185
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
186 187
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
188
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
189
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
190
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
191
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
192
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
H
Haojun Liao 已提交
193
                                     STableBlockScanInfo* pScanInfo);
194
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
195
                                         int32_t rowIndex);
196
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
197 198
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
199

H
Hongze Cheng 已提交
200 201 202 203
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
204 205
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
206

dengyihao's avatar
dengyihao 已提交
207 208 209 210
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
211
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
212 213 214
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
215
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
216
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
217
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
218
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
H
Haojun Liao 已提交
219

220 221
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

H
Haojun Liao 已提交
222
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
223
  pSupInfo->smaValid = true;
224
  pSupInfo->numOfCols = numOfCols;
H
Haojun Liao 已提交
225 226 227
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES));
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
228 229
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
230

H
Haojun Liao 已提交
231 232
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
  pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
233
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
234 235
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
236

H
Haojun Liao 已提交
237 238
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
239 240
    } else {
      pSupInfo->buildBuf[i] = NULL;
241
    }
H
Haojun Liao 已提交
242
  }
H
Hongze Cheng 已提交
243

H
Haojun Liao 已提交
244 245
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
246

H
Haojun Liao 已提交
247
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
248 249 250 251
  int32_t i = 0, j = 0;

  while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
252
    if (pTCol->colId == pSupInfo->colId[j]) {
253 254
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
255
        return TSDB_CODE_SUCCESS;
256 257 258 259
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
260
    } else if (pTCol->colId < pSupInfo->colId[j]) {
261 262 263
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
264
      return TSDB_CODE_INVALID_PARA;
265 266
    }
  }
H
Haojun Liao 已提交
267 268

  return TSDB_CODE_SUCCESS;
269 270
}

271
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
272
  int32_t num = numOfTables / pBuf->numPerBucket;
273 274 275 276 277
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
278
  for (int32_t i = 0; i < num; ++i) {
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
300
  for (int32_t i = 0; i < num; ++i) {
301 302 303 304 305 306 307 308 309
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
310
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
311 312 313 314
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Haojun Liao 已提交
315
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
316
  // allocate buffer in order to load data blocks from file
317
  // todo use simple hash instead, optimize the memory consumption
318 319 320
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
321 322 323
    return NULL;
  }

H
Haojun Liao 已提交
324
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
325
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
326

327
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
328
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
329 330 331 332 333 334 335 336 337 338
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
339 340
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
341 342
  }

H
Haojun Liao 已提交
343 344 345 346
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
347

348
  return pTableMap;
H
Hongze Cheng 已提交
349
}
H
Hongze Cheng 已提交
350

351 352
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
353
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
354
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
355 356

    pInfo->iterInit = false;
H
Haojun Liao 已提交
357
    pInfo->iter.hasVal = false;
358
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
359

360 361
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
362 363
    }

H
Haojun Liao 已提交
364 365 366 367
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
    }

368 369
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
370 371 372
  }
}

373 374
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
375 376

  p->iter.hasVal = false;
377
  p->iiter.hasVal = false;
378

379 380 381
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
382

383 384 385
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
386

387 388 389 390
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
391

H
Haojun Liao 已提交
392
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
393
  void* p = NULL;
H
Haojun Liao 已提交
394
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
395
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
396 397 398 399 400
  }

  taosHashCleanup(pTableMap);
}

401
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
402
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
403
}
H
Hongze Cheng 已提交
404

405 406 407
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
408
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
409

410
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
411
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
412

dengyihao's avatar
dengyihao 已提交
413
  STimeWindow win = *pWindow;
414 415 416 417 418 419
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
420

H
Haojun Liao 已提交
421
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
422 423 424 425 426 427
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
428 429 430
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
431 432 433 434
  }
}

// init file iterator
435
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
436
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
437

438 439
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
440
  pIter->pFileList = aDFileSet;
441
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
442

443 444 445 446
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
447
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
448 449
      return code;
    }
450 451
  }

452 453 454 455 456 457 458 459
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

460
  if (pLReader->pInfo == NULL) {
461
    // here we ignore the first column, which is always be the primary timestamp column
462
    pLReader->pInfo =
H
Haojun Liao 已提交
463
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
464 465 466 467
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
468 469
  }

470
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
471 472 473
  return TSDB_CODE_SUCCESS;
}

474
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
475 476
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
477 478 479
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
480 481 482
    return false;
  }

H
Haojun Liao 已提交
483 484 485
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

486 487
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
488
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
489

H
Haojun Liao 已提交
490 491
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
492

493
  while (1) {
H
Haojun Liao 已提交
494 495 496
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
497

498
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
499

500 501 502 503
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
504

505 506
    pReader->cost.headFileLoad += 1;

507 508 509 510 511 512 513 514 515 516 517 518
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
519 520 521
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
522 523
      continue;
    }
C
Cary Xu 已提交
524

525
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
526
              pReader->window.ekey, pReader->idStr);
527 528
    return true;
  }
529

H
Haojun Liao 已提交
530
_err:
H
Haojun Liao 已提交
531 532 533
  return false;
}

534
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
535 536
  pIter->order = order;
  pIter->index = -1;
537
  pIter->numOfBlocks = 0;
538 539 540 541 542 543 544
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
545
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
546

H
Haojun Liao 已提交
547
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
548 549
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
550 551
}

552 553 554 555 556 557 558 559
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
560
    SColumnInfoData colInfo = {0};
561 562 563 564 565 566 567 568 569 570 571 572 573
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

574
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
575
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
576
  int32_t      code = 0;
577
  int8_t       level = 0;
H
Haojun Liao 已提交
578
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
579 580
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
581
    goto _end;
H
Hongze Cheng 已提交
582 583
  }

C
Cary Xu 已提交
584
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
585
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
586 587
  }

H
Haojun Liao 已提交
588
  initReaderStatus(&pReader->status);
589

L
Liu Jicong 已提交
590
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
591 592
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
593
  pReader->capacity = capacity;
H
Haojun Liao 已提交
594
  pReader->pResBlock = pResBlock;
dengyihao's avatar
dengyihao 已提交
595 596
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
597
  pReader->type = pCond->type;
598
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
599
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
600

H
Haojun Liao 已提交
601 602 603 604 605 606 607 608 609
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }

H
Haojun Liao 已提交
610 611 612 613 614 615
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }

H
Haojun Liao 已提交
616
  // todo refactor.
617
  limitOutputBufferSize(pCond, &pReader->capacity);
618

619 620
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
621
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
622
  if (pSup->pColAgg == NULL) {
623 624 625
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
626

627 628
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
629
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
630 631 632 633 634
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

H
Haojun Liao 已提交
635
  setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
636

H
Hongze Cheng 已提交
637 638
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
639

H
Haojun Liao 已提交
640
_end:
H
Haojun Liao 已提交
641
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
642 643 644
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
645

H
Haojun Liao 已提交
646
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
647
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
648

649
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
650
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
651
  if (code != TSDB_CODE_SUCCESS) {
652
    goto _end;
H
Haojun Liao 已提交
653
  }
H
Hongze Cheng 已提交
654

655 656
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
657
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
658 659
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
660

661 662 663 664
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
665
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
666

667
    // uid check
H
Hongze Cheng 已提交
668
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
669 670 671 672
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
673
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
674 675 676 677
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
678
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
679
    if (pScanInfo->pBlockList == NULL) {
680
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
681 682
    }

H
Hongze Cheng 已提交
683
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
684
  }
H
Hongze Cheng 已提交
685

686
  int64_t et2 = taosGetTimestampUs();
687
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
688
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
689 690 691

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
692
_end:
H
Hongze Cheng 已提交
693
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
694 695
  return code;
}
H
Hongze Cheng 已提交
696

697
static void cleanupTableScanInfo(SHashObj* pTableMap) {
698
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
699
  while (1) {
700
    px = taosHashIterate(pTableMap, px);
701 702 703 704
    if (px == NULL) {
      break;
    }

705
    // reset the index in last block when handing a new file
706 707
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
708
  }
709 710
}

711
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
712 713 714 715 716 717
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
718

dengyihao's avatar
dengyihao 已提交
719
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
720
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
721

H
Haojun Liao 已提交
722 723
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
724

725
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
726
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
727
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
728

729
    sizeInDisk += pScanInfo->mapData.nData;
H
Haojun Liao 已提交
730

H
Haojun Liao 已提交
731
    SDataBlk block = {0};
732
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
733
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
734

735
      // 1. time range check
H
Haojun Liao 已提交
736
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
737 738
        continue;
      }
H
Hongze Cheng 已提交
739

740
      // 2. version range check
H
Haojun Liao 已提交
741
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
742 743
        continue;
      }
744

H
Haojun Liao 已提交
745 746
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
747

H
Haojun Liao 已提交
748 749
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
750
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
751 752
        return TSDB_CODE_OUT_OF_MEMORY;
      }
753

754
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
755
    }
H
Hongze Cheng 已提交
756

H
Haojun Liao 已提交
757
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
758 759 760 761
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
762
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
763
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
764

765
  double el = (taosGetTimestampUs() - st) / 1000.0;
766 767 768 769 770
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
771

H
Haojun Liao 已提交
772

773
  pReader->cost.numOfBlocks += total;
774
  pReader->cost.headFileLoadTime += el;
775

H
Haojun Liao 已提交
776 777
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
778

779
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
780
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
781
  pDumpInfo->allDumped = true;
782
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
783 784
}

785 786
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
787
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
788
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
789 790 791
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
792
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
793 794 795 796
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
797 798 799
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
800
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
801
  }
H
Haojun Liao 已提交
802 803
}

804
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
805 806 807
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
808 809
    return NULL;
  }
810 811 812

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
813 814
}

H
Hongze Cheng 已提交
815
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
816

H
Haojun Liao 已提交
817 818 819 820 821 822
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
823
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
824 825 826 827
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
828 829
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
830 831
    while (1) {
      // check can return
H
Hongze Cheng 已提交
832 833 834
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
835 836

      // change start or end position
H
Hongze Cheng 已提交
837
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
838 839
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
840
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
841 842 843 844
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
845
  } else {  // DESC
H
Haojun Liao 已提交
846
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
847 848
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
849 850
    while (1) {
      // check can return
H
Hongze Cheng 已提交
851 852 853
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
854 855

      // change start or end position
H
Hongze Cheng 已提交
856
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
857 858
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
859
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
860 861 862 863 864 865 866
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
867
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
H
Haojun Liao 已提交
868 869
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
870
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
871 872 873 874 875 876

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
877 878
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
879 880 881 882 883
  }

  return endPos;
}

H
Haojun Liao 已提交
884
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
                            int32_t dumpedRows, bool asc)  {
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

  int32_t step = asc? 1:-1;

H
Haojun Liao 已提交
917 918
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
//  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
    switch(pColData->info.type) {
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
      case TSDB_DATA_TYPE_UBIGINT:
      {
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
        int32_t  mid = dumpedRows >> 1u;
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
947
          int8_t t = pts[j];
H
Haojun Liao 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
972
          int32_t t = pts[j];
H
Haojun Liao 已提交
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

995
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
H
Haojun Liao 已提交
996 997 998 999
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1000

1001
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
1002
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1003
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1004
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1005
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1006

H
Haojun Liao 已提交
1007
  SColVal cv = {0};
1008
  int64_t st = taosGetTimestampUs();
1009 1010
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1011

1012 1013
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1014 1015 1016
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1017
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1018
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1019 1020 1021
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1031
    }
H
Haojun Liao 已提交
1032 1033 1034
  }

  // time window check
1035 1036 1037 1038 1039 1040 1041
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1042 1043 1044
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1045 1046
  }

H
Haojun Liao 已提交
1047
  int32_t i = 0;
H
Haojun Liao 已提交
1048 1049
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1050 1051
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1052
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1053 1054 1055
    i += 1;
  }

1056
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1057
  int32_t num = pBlockData->nColData;
1058
  while (i < numOfOutputCols && colIndex < num) {
1059 1060
    rowIndex = 0;

H
Hongze Cheng 已提交
1061
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1062
    if (pData->cid < pSupInfo->colId[i]) {
1063
      colIndex += 1;
H
Haojun Liao 已提交
1064 1065
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1066

H
Hongze Cheng 已提交
1067
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1068
        colDataAppendNNULL(pColData, 0, dumpedRows);
H
Haojun Liao 已提交
1069
      } else {
H
Haojun Liao 已提交
1070
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1071 1072
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1073
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
H
Haojun Liao 已提交
1074 1075 1076 1077
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1078
      }
H
Haojun Liao 已提交
1079

1080
      colIndex += 1;
1081
      i += 1;
1082
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1083
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1084
      colDataAppendNNULL(pColData, 0, dumpedRows);
1085
      i += 1;
H
Haojun Liao 已提交
1086
    }
1087 1088
  }

1089
  // fill the mis-matched columns with null value
1090
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1091
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1092
    colDataAppendNNULL(pColData, 0, dumpedRows);
1093
    i += 1;
H
Haojun Liao 已提交
1094
  }
H
Haojun Liao 已提交
1095

1096
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1097 1098
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1099

1100
  // check if current block are all handled
1101
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1102 1103 1104 1105
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1106
  } else {
1107 1108
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1109
  }
H
Haojun Liao 已提交
1110

1111
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1112
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1113

1114
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1115
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Haojun Liao 已提交
1116
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1117
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1118
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1119 1120 1121 1122

  return TSDB_CODE_SUCCESS;
}

1123 1124
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1125 1126
  int64_t st = taosGetTimestampUs();

1127 1128
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1129
  int32_t code =
H
Haojun Liao 已提交
1130
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
1131 1132 1133 1134
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1135
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1136
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1137

H
Hongze Cheng 已提交
1138
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1139
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1140 1141
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1142
              ", rows:%d, code:%s %s",
1143
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1144 1145 1146
              tstrerror(code), pReader->idStr);
    return code;
  }
1147

1148
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1149

1150
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1151
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1152 1153
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1154 1155 1156

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1157

H
Haojun Liao 已提交
1158
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1159
}
H
Hongze Cheng 已提交
1160

H
Haojun Liao 已提交
1161 1162 1163
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1164

H
Haojun Liao 已提交
1165 1166 1167 1168
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1169

H
Haojun Liao 已提交
1170 1171
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1172

H
Haojun Liao 已提交
1173 1174
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1175 1176
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1177

H
Haojun Liao 已提交
1178 1179 1180 1181
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1182

H
Haojun Liao 已提交
1183 1184
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1185

H
Haojun Liao 已提交
1186
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1187
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1188
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1189

H
Haojun Liao 已提交
1190
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1191

H
Haojun Liao 已提交
1192 1193
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1194

H
Haojun Liao 已提交
1195 1196 1197 1198 1199 1200 1201
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1202

1203
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1204
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1205

1206 1207 1208
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1209
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1210 1211
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1212
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1213
    if (pScanInfo == NULL) {
1214
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1215 1216 1217
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1218 1219
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1220
  }
1221 1222 1223 1224 1225 1226

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1227
}
H
Hongze Cheng 已提交
1228

1229
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1230
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1231

1232
  SBlockOrderSupporter sup = {0};
1233
  pBlockIter->numOfBlocks = numOfBlocks;
1234
  taosArrayClear(pBlockIter->blockList);
1235
  pBlockIter->pTableMap = pReader->status.pTableMap;
1236

1237 1238
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1239

1240
  int64_t st = taosGetTimestampUs();
1241
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1242 1243 1244
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1245

1246 1247 1248 1249 1250 1251 1252
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1253

1254
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1255 1256 1257
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1258

1259 1260
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1261

1262 1263 1264
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1265
      return TSDB_CODE_OUT_OF_MEMORY;
1266
    }
H
Haojun Liao 已提交
1267

1268
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1269

1270 1271 1272
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1273
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1274 1275 1276 1277 1278
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1279

H
Haojun Liao 已提交
1280 1281 1282 1283
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1284

1285
  // since there is only one table qualified, blocks are not sorted
1286 1287
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1288 1289
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1290
    }
1291

1292
    int64_t et = taosGetTimestampUs();
1293
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1294
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1295

1296
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1297
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1298
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1299
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1300
  }
H
Haojun Liao 已提交
1301

1302 1303
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1304

1305
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1306 1307

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1308 1309
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1310
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1311
  }
H
Haojun Liao 已提交
1312

1313 1314 1315 1316
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1317

1318 1319
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1320

1321 1322 1323 1324
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1325

1326 1327
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1328
  }
H
Haojun Liao 已提交
1329

1330
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1331 1332
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1333 1334
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1335

1336
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1337
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1338

1339
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1340
}
H
Hongze Cheng 已提交
1341

H
Haojun Liao 已提交
1342
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1343 1344
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1345
  int32_t step = asc ? 1 : -1;
1346
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1347 1348 1349
    return false;
  }

1350
  pBlockIter->index += step;
H
Haojun Liao 已提交
1351
  doSetCurrentBlock(pBlockIter, idStr);
1352

1353 1354 1355
  return true;
}

1356 1357 1358
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1359
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1360 1361
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1362 1363
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1364
}
H
Hongze Cheng 已提交
1365

1366
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1367
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1368
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1369
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1370
    return false;
1371 1372
  }

H
Haojun Liao 已提交
1373
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1374
    return false;
1375 1376
  }

1377
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1378
  *nextIndex = pBlockInfo->tbBlockIdx + step;
H
Hongze Cheng 已提交
1379 1380
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1381
  return true;
1382 1383 1384
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1385
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1386 1387
  int32_t index = pBlockIter->index;

1388
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1400
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1401
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1402 1403 1404 1405
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1406 1407 1408 1409 1410
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1411

1412 1413 1414
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1415

H
Haojun Liao 已提交
1416
  doSetCurrentBlock(pBlockIter, "");
1417 1418 1419
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1420
// todo: this attribute could be acquired during extractin the global ordered block list.
1421
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1422 1423
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1424
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1425
  } else {
1426
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1427
  }
H
Haojun Liao 已提交
1428
}
H
Hongze Cheng 已提交
1429

H
Hongze Cheng 已提交
1430
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1431
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1432

1433
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1434
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1435
}
H
Hongze Cheng 已提交
1436

H
Hongze Cheng 已提交
1437
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1438 1439
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1440 1441
}

H
Hongze Cheng 已提交
1442 1443
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1444 1445
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1446
  for (int32_t i = startIndex; i < num; i += 1) {
1447 1448
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1449
      if (p->version >= pBlock->minVer) {
1450 1451 1452
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1453
      if (p->version >= pBlock->minVer) {
1454 1455
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1456 1457
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1471
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1472 1473 1474 1475
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1476
  // ts is not overlap
1477
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1478
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1479 1480 1481 1482 1483
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1484
  if (ASCENDING_TRAVERSE(order)) {
1485
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1486 1487
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1488
    while (1) {
1489 1490 1491 1492 1493
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1494 1495 1496
      }
    }

1497
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1498
  }
1499 1500
}

H
Haojun Liao 已提交
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1514 1515
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1516

1517
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1518

1519
  // overlap with neighbor
1520
  if (hasNeighbor) {
1521
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1522 1523
  }

1524
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1525 1526
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1527

H
Haojun Liao 已提交
1528 1529 1530
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1531 1532
  }

H
Haojun Liao 已提交
1533 1534 1535 1536
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1537

H
Haojun Liao 已提交
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1552 1553 1554 1555

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1556 1557
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1558 1559 1560
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1561 1562 1563
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1564 1565
}

H
Haojun Liao 已提交
1566 1567 1568 1569 1570 1571 1572 1573 1574
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1575
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1576
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1577 1578
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1579

1580 1581 1582
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1583
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1584

H
Haojun Liao 已提交
1585
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1586
  pBlock->info.id.uid = pBlockScanInfo->uid;
1587

1588
  setComposedBlockFlag(pReader, true);
1589

1590
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1591
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
H
Hongze Cheng 已提交
1592
            " - %" PRId64 " %s",
1593 1594
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1595 1596

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1597 1598 1599
  return code;
}

1600 1601
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1602 1603 1604
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1605 1606
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1607
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1608 1609

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1610
    if (nextKey != key) {  // merge is not needed
1611
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1612 1613 1614 1615 1616 1617 1618 1619
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1620 1621
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1622 1623 1624 1625 1626 1627 1628 1629
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1630 1631
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1632 1633 1634 1635 1636 1637 1638
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1639
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1654 1655 1656
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1657
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1658 1659
  }

1660
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1661 1662 1663 1664 1665 1666
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1667 1668 1669 1670 1671 1672
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1673 1674 1675 1676 1677 1678
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1679
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1680
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1681
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1682 1683
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1684 1685
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1686
  }
H
Haojun Liao 已提交
1687 1688
}

1689
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1690 1691 1692 1693 1694 1695
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1696
  int64_t tsLast = INT64_MIN;
1697
  if (hasDataInLastBlock(pLastBlockReader)) {
1698 1699
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1700

H
Hongze Cheng 已提交
1701 1702
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1703

1704 1705
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1706
    minKey = INT64_MAX;  // chosen the minimum value
1707
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1708 1709
      minKey = tsLast;
    }
1710

1711 1712 1713
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1714

1715
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1716 1717 1718 1719
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1720
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1721 1722 1723 1724 1725 1726 1727
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1728
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1729 1730
      minKey = key;
    }
1731 1732 1733 1734
  }

  bool init = false;

1735
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1736
  // DESC: mem -----> imem -----> last block -----> file block
1737 1738
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1739
      init = true;
H
Haojun Liao 已提交
1740 1741 1742 1743
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1744
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1745 1746
    }

1747
    if (minKey == tsLast) {
1748
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1749 1750 1751
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1752
        init = true;
H
Haojun Liao 已提交
1753 1754 1755 1756
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1757
      }
1758
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1759
    }
1760

1761
    if (minKey == k.ts) {
K
kailixu 已提交
1762 1763 1764 1765
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1766
      if (init) {
K
kailixu 已提交
1767
        tRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1768
      } else {
1769
        init = true;
H
Haojun Liao 已提交
1770 1771 1772 1773 1774 1775 1776 1777
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1778 1779 1780 1781 1782
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1783
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1784 1785 1786 1787 1788 1789
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1790
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1791 1792
        return code;
      }
1793 1794
    }

1795
    if (minKey == tsLast) {
1796
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1797 1798 1799
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1800
        init = true;
H
Haojun Liao 已提交
1801 1802 1803 1804
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1805
      }
1806
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1807 1808 1809
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1810 1811 1812
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1813
        init = true;
H
Haojun Liao 已提交
1814 1815 1816 1817
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1818 1819 1820
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1821 1822
  }

1823 1824 1825 1826 1827
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1828
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1829 1830 1831 1832 1833 1834

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1835 1836 1837
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1838
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1839
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1840 1841 1842

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1843
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1844
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1845

1846 1847 1848
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1849
      pBlockScanInfo->lastKey = tsLastBlock;
1850 1851
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1852 1853 1854 1855
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1856 1857 1858

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1859
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1860

H
Haojun Liao 已提交
1861
      code = tRowMergerGetRow(&merge, &pTSRow);
1862 1863 1864
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1865

1866
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1867 1868 1869 1870 1871

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1872 1873 1874 1875 1876
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1877
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1878 1879

    // merge with block data if ts == key
H
Haojun Liao 已提交
1880
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1881 1882 1883
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1884
    code = tRowMergerGetRow(&merge, &pTSRow);
1885 1886 1887 1888
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1889
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1890 1891 1892 1893

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1894 1895 1896 1897

  return TSDB_CODE_SUCCESS;
}

1898 1899
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1900 1901
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1902
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1903
    // no last block available, only data block exists
1904
    if (!hasDataInLastBlock(pLastBlockReader)) {
1905 1906 1907 1908 1909 1910 1911 1912 1913
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1914
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1915 1916 1917 1918
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1919

H
Haojun Liao 已提交
1920 1921 1922 1923 1924
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1925
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1926 1927 1928 1929

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1930
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1931

H
Haojun Liao 已提交
1932
        code = tRowMergerGetRow(&merge, &pTSRow);
1933 1934 1935 1936
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1937
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1938

1939 1940
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1941
        return code;
1942
      } else {
1943
        return TSDB_CODE_SUCCESS;
1944
      }
1945
    } else {  // desc order
1946
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1947
    }
1948
  } else {  // only last block exists
1949
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1950
  }
1951 1952
}

1953 1954
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1955 1956 1957
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1958 1959 1960
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1961 1962
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1963

1964
  int64_t tsLast = INT64_MIN;
1965 1966 1967
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1968

H
Hongze Cheng 已提交
1969
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1970 1971 1972 1973

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1974
  int64_t minKey = 0;
1975 1976 1977 1978 1979
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1980

1981 1982 1983
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1984

1985
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1986 1987
      minKey = key;
    }
1988

1989
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1990 1991 1992
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1993
    minKey = INT64_MIN;  // let find the maximum ts value
1994 1995 1996 1997 1998 1999 2000 2001
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2002
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2003 2004 2005
      minKey = key;
    }

2006
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2007 2008
      minKey = tsLast;
    }
2009 2010 2011 2012
  }

  bool init = false;

2013 2014 2015 2016
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2017
      init = true;
2018
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2019
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2020 2021 2022 2023
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2024
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2025 2026
    }

2027
    if (minKey == tsLast) {
2028
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2029 2030 2031
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2032
        init = true;
2033
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2034 2035 2036
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2037
      }
H
Haojun Liao 已提交
2038

2039
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2040 2041 2042
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2043 2044 2045
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2046 2047
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2048 2049 2050 2051
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2052
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2053 2054 2055
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2056
      }
H
Haojun Liao 已提交
2057

2058 2059
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2060 2061 2062
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2063 2064
    }

2065
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2066
      if (init) {
2067 2068 2069 2070
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2071 2072
        tRowMerge(&merge, pRow);
      } else {
2073
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2074
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2075 2076 2077
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2078
      }
H
Haojun Liao 已提交
2079 2080 2081 2082 2083
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2084 2085 2086 2087
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2088
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2089
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2090 2091 2092 2093
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2094 2095 2096 2097 2098
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2099 2100 2101
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2102 2103 2104
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2105 2106
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2107
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2108 2109 2110
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2111
      }
H
Haojun Liao 已提交
2112 2113 2114 2115 2116
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2117 2118 2119
    }

    if (minKey == tsLast) {
2120
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2121 2122 2123
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2124
        init = true;
2125
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2126 2127 2128
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2129
      }
2130
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2131 2132 2133
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2134
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2135
      if (!init) {
2136
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2137 2138 2139
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2140
      } else {
2141 2142 2143
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2144
        tRowMerge(&merge, &fRow);
2145 2146
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2147 2148 2149
    }
  }

2150
  if (merge.pTSchema == NULL) {
2151 2152 2153
    return code;
  }

H
Haojun Liao 已提交
2154
  code = tRowMergerGetRow(&merge, &pTSRow);
2155 2156 2157 2158
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2159
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2160 2161 2162

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2163
  return code;
2164 2165
}

2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2190
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2191
                  "-%" PRId64 " %s",
2192 2193
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2194
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2195 2196 2197 2198 2199
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2200
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2201 2202 2203 2204 2205 2206 2207 2208 2209 2210
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2211
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2212
                  "-%" PRId64 " %s",
2213 2214
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2215
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2216 2217 2218 2219 2220
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2221
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2222 2223 2224 2225 2226 2227 2228 2229
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2230 2231
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2232 2233 2234 2235 2236 2237 2238 2239
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2251
  TSDBKEY k = {.ts = ts, .version = ver};
2252 2253
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2254 2255 2256
    return false;
  }

2257 2258 2259
  return true;
}

2260
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2261
  // the last block reader has been initialized for this table.
2262
  if (pLBlockReader->uid == pScanInfo->uid) {
2263
    return hasDataInLastBlock(pLBlockReader);
2264 2265
  }

2266 2267
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2268 2269
  }

2270 2271
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2272

H
Hongze Cheng 已提交
2273
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2274 2275 2276
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2277
  } else {
2278
    w.ekey = pScanInfo->lastKey + step;
2279 2280
  }

2281 2282 2283
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2284 2285 2286 2287
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2288
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2289 2290
}

2291
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2292
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2293
  return TSDBROW_TS(&row);
2294 2295
}

H
Hongze Cheng 已提交
2296
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2297 2298

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2299 2300
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
    return false; // this is an invalid result.
2301
  }
2302
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2303
}
2304

2305 2306
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2307 2308
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2309
    pBlockScanInfo->lastKey = key;
2310 2311
    return TSDB_CODE_SUCCESS;
  } else {
2312 2313
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2314 2315 2316
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2317 2318 2319 2320 2321
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2322
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2323
    code = tRowMergerGetRow(&merge, &pTSRow);
2324 2325 2326 2327
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2328
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2329 2330 2331 2332 2333 2334 2335

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2336 2337
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2338 2339
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2340
  TSDBROW *pRow = NULL, *piRow = NULL;
2341
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2342 2343 2344
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
2345

2346 2347 2348
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
2349

2350 2351 2352 2353
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2354

2355 2356 2357 2358
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2359

2360 2361 2362
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2363
  }
2364 2365 2366

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2367 2368
}

H
Haojun Liao 已提交
2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2422
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2423 2424
  int32_t code = TSDB_CODE_SUCCESS;

2425 2426
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2427
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2428 2429
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2430
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2431
  int64_t st = taosGetTimestampUs();
2432
  int32_t step = asc ? 1 : -1;
2433
  double  el = 0;
2434 2435 2436

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2437 2438
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2439
      code = TSDB_CODE_INVALID_PARA;
2440 2441
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2442 2443 2444
      goto _end;
    }

H
Hongze Cheng 已提交
2445
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2446

H
Haojun Liao 已提交
2447
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2448
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2449 2450

    // it is a clean block, load it directly
2451 2452
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && 
        pBlock->nRow <= pReader->capacity) {
2453
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2454
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2455 2456

        // record the last key value
H
Hongze Cheng 已提交
2457
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2458 2459
        goto _end;
      }
H
Haojun Liao 已提交
2460 2461
    }
  } else {  // file blocks not exist
2462
    pBlockScanInfo = *pReader->status.pTableIter;
2463 2464
  }

2465
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2466
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2467

2468
  while (1) {
2469
    bool hasBlockData = false;
2470
    {
H
Haojun Liao 已提交
2471
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2472 2473 2474 2475 2476
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2477 2478
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2479
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2480
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2481
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2482

H
Haojun Liao 已提交
2483 2484 2485 2486 2487
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2488 2489
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2490
          }
2491 2492
        }
      }
2493
    }
2494

2495
    // no data in last block and block, no need to proceed.
2496
    if (hasBlockData == false) {
2497
      break;
2498 2499
    }

2500
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2501

2502
    // currently loaded file data block is consumed
2503
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2504
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2505
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2506 2507 2508 2509 2510
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2511 2512 2513
    }
  }

H
Haojun Liao 已提交
2514
_end:
2515 2516
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2517

2518 2519
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Hongze Cheng 已提交
2520
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2521
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2522
              pResBlock->info.rows, el, pReader->idStr);
2523
  }
2524

H
Haojun Liao 已提交
2525
  return code;
2526 2527 2528 2529
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2530 2531 2532 2533 2534 2535 2536 2537
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2538 2539
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2540 2541 2542
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2543

2544
  int32_t code = 0;
2545 2546
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2547
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2548
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2549
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2550
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2551

H
Haojun Liao 已提交
2552
    if (pIdx != NULL) {
2553
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2554 2555 2556
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2557
    }
2558
  }
2559

2560 2561 2562 2563 2564 2565 2566
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2567 2568
  }

2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2583 2584 2585 2586 2587 2588 2589
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2590 2591
  return code;

H
Haojun Liao 已提交
2592
_err:
2593 2594
  taosArrayDestroy(pDelData);
  return code;
2595 2596
}

H
Haojun Liao 已提交
2597
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2598
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2599
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2600
  if (pRow != NULL) {
2601 2602 2603
    key = TSDBROW_KEY(pRow);
  }

2604
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2605
  if (pRow != NULL) {
2606 2607 2608 2609 2610 2611 2612 2613 2614
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2615
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2616
  SReaderStatus* pStatus = &pReader->status;
2617
  pBlockNum->numOfBlocks = 0;
2618
  pBlockNum->numOfLastFiles = 0;
2619

2620
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2621
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2622 2623

  while (1) {
2624
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2625
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2626 2627 2628
      break;
    }

H
Haojun Liao 已提交
2629
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2630 2631
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2632
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2633 2634 2635
      return code;
    }

H
Hongze Cheng 已提交
2636
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2637
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2638
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2639
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2640 2641 2642
        return code;
      }

2643
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2644 2645 2646
        break;
      }
    }
2647

H
Haojun Liao 已提交
2648 2649 2650
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2651
  taosArrayDestroy(pIndexList);
2652

H
Haojun Liao 已提交
2653
  if (pReader->pReadSnap != NULL) {
2654

H
Haojun Liao 已提交
2655 2656 2657 2658 2659 2660
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2661

H
Haojun Liao 已提交
2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2673 2674 2675
    }
  }

H
Haojun Liao 已提交
2676 2677 2678
  return TSDB_CODE_SUCCESS;
}

2679
static int32_t uidComparFunc(const void* p1, const void* p2) {
2680 2681
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2682 2683 2684
  if (pu1 == pu2) {
    return 0;
  } else {
2685
    return (pu1 < pu2) ? -1 : 1;
2686 2687
  }
}
2688

2689
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
2690 2691 2692 2693
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2694
  while (p != NULL) {
H
Hongze Cheng 已提交
2695
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
2696 2697 2698 2699 2700 2701 2702
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714
// reset the last del file index
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
  void* p = taosHashIterate(pStatus->pTableMap, NULL);
  while (p != NULL) {
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;

    // reset the last del file index
    pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
    p = taosHashIterate(pStatus->pTableMap, p);
  }
}

2715 2716 2717
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;

2718 2719 2720 2721
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2722

2723
  if (pOrderCheckInfo->tableUidList == NULL) {
2724 2725 2726 2727 2728 2729
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2730
    extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
2731 2732 2733
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2734 2735
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2736 2737
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2738 2739

      // the tableMap has already updated
2740
      if (pStatus->pTableIter == NULL) {
2741
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2742 2743 2744 2745 2746
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
2747
        extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
2748 2749 2750

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2751
      }
2752
    }
2753
  }
2754

2755 2756 2757
  return TSDB_CODE_SUCCESS;
}

2758
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2759 2760 2761 2762 2763 2764 2765 2766
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2767
  return (pStatus->pTableIter != NULL);
2768 2769
}

2770
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2771
  SReaderStatus*    pStatus = &pReader->status;
2772 2773
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2774
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
2775
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pReader);
2776
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2777 2778
    return code;
  }
2779

2780 2781
  SSDataBlock* pResBlock = pReader->pResBlock;

2782
  while (1) {
2783
    // load the last data block of current table
H
Hongze Cheng 已提交
2784
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2785 2786

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2787
    if (!hasVal) {
2788 2789
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2790 2791
        return TSDB_CODE_SUCCESS;
      }
2792

2793
      continue;
2794 2795
    }

2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2809 2810
    }

2811 2812 2813 2814 2815 2816 2817 2818
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2819 2820
      return TSDB_CODE_SUCCESS;
    }
2821

2822
    // current table is exhausted, let's try next table
2823 2824
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2825 2826
      return TSDB_CODE_SUCCESS;
    }
2827 2828 2829
  }
}

2830
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2831 2832
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2833 2834 2835

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2836 2837 2838
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2839

H
Haojun Liao 已提交
2840 2841
  ASSERT(pBlockInfo != NULL);

2842
  if (pBlockInfo != NULL) {
H
Hongze Cheng 已提交
2843 2844
    pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2845
  } else {
2846
    pScanInfo = *pReader->status.pTableIter;
2847 2848
  }

H
Haojun Liao 已提交
2849
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2850
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2851 2852 2853 2854
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2855
  if (pBlockInfo != NULL) {
2856
    pBlock = getCurrentBlock(pBlockIter);
2857 2858
  }

2859
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2860
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2861

H
Haojun Liao 已提交
2862
  /*if (pBlockInfo == NULL) {  // build data block from last data file
2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893
    SBlockData* pBData = &pReader->status.fileBlockData;
    tBlockDataReset(pBData);

    SSDataBlock* pResBlock = pReader->pResBlock;
    tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);

    int64_t st = taosGetTimestampUs();

    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
    }

    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                    " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
    }
H
Haojun Liao 已提交
2894
  } else*/ if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2895
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2896 2897
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2898 2899 2900
    }

    // build composed data block
2901
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2902
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2903
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2904
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2905
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2906
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2907 2908 2909 2910
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2911
      ASSERT(tsLast >= pBlock->maxKey.ts);
2912

2913 2914 2915 2916
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
2917
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                      " rows:%d, elapsed time:%.2f ms %s",
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
2944
    } else {  // whole block is required, return it directly
2945 2946
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
2947
      pInfo->id.uid = pScanInfo->uid;
2948 2949 2950
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2951

2952
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
2953
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
2954
    }
2955 2956 2957 2958 2959
  }

  return code;
}

H
Haojun Liao 已提交
2960
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2961 2962
  SReaderStatus* pStatus = &pReader->status;

2963
  while (1) {
2964 2965 2966
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2967
        return TSDB_CODE_SUCCESS;
2968 2969 2970
      }
    }

2971 2972
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2973

2974
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2975
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2976 2977 2978 2979
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2980
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2981
      return TSDB_CODE_SUCCESS;
2982 2983 2984 2985 2986
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2987
      return TSDB_CODE_SUCCESS;
2988 2989 2990 2991
    }
  }
}

2992
// set the correct start position in case of the first/last file block, according to the query time window
2993
void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2994
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2995

2996 2997 2998
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2999 3000 3001

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
3002
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
3003 3004
}

3005
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
3006 3007
  SBlockNumber num = {0};

3008
  int32_t code = moveToNextFile(pReader, &num);
3009 3010 3011 3012 3013
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3014
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3015 3016 3017 3018 3019
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3020 3021
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3022
  } else {  // no block data, only last block exists
3023
    tBlockDataReset(&pReader->status.fileBlockData);
3024
    resetDataBlockIterator(pBlockIter, pReader->order);
3025
  }
3026 3027

  // set the correct start position according to the query time window
3028
  initBlockDumpInfo(pReader, pBlockIter);
3029 3030 3031
  return code;
}

3032
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3033 3034
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3035 3036
}

3037
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3038
  int32_t code = TSDB_CODE_SUCCESS;
3039 3040
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3041 3042
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3043
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3044
  _begin:
3045 3046 3047 3048 3049
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3050 3051 3052 3053
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3054
    // all data blocks are checked in this last block file, now let's try the next file
3055 3056 3057 3058 3059 3060 3061 3062
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3063
      // this file does not have data files, let's start check the last block file if exists
3064
      if (pBlockIter->numOfBlocks == 0) {
3065
        resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3080
  while (1) {
3081 3082
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3083
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3084
      code = buildComposedDataBlock(pReader);
3085 3086 3087 3088
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3089
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3090 3091
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3092
        } else {
H
Haojun Liao 已提交
3093
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3094 3095 3096
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
3097
            resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3098 3099 3100
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3101

3102 3103 3104 3105
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3106

3107 3108
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
3109
              resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3110 3111
              goto _begin;
            }
3112
          }
3113
        }
H
Haojun Liao 已提交
3114
      }
3115 3116

      code = doBuildDataBlock(pReader);
3117 3118
    }

3119 3120 3121 3122 3123 3124 3125 3126
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3127
}
H
refact  
Hongze Cheng 已提交
3128

3129 3130
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3131
  if (VND_IS_RSMA(pVnode)) {
3132
    int8_t  level = 0;
3133 3134
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3135
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
H
Hongze Cheng 已提交
3136 3137
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3138

3139
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3140 3141 3142 3143 3144 3145 3146
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3147
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3148 3149 3150 3151 3152
        break;
      }
      ++level;
    }

3153
    const char* str = (idStr != NULL) ? idStr : "";
3154 3155

    if (level == TSDB_RETENTION_L0) {
3156
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3157
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3158 3159
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3160
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3161
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3162 3163
      return VND_RSMA1(pVnode);
    } else {
3164
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3165
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3166 3167 3168 3169 3170 3171 3172
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3173
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3174
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3175 3176

  int64_t endVer = 0;
L
Liu Jicong 已提交
3177 3178
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3179 3180
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3181
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3182 3183
  }

H
Haojun Liao 已提交
3184
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3185 3186
}

3187
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3188 3189 3190
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3191

L
Liu Jicong 已提交
3192 3193 3194
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3195

3196 3197 3198 3199 3200 3201
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3202
        return false;
3203 3204
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
5
54liuyao 已提交
3205
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
3206 3207
      }
    } else {
3208 3209 3210 3211 3212 3213 3214
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3215 3216
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3232 3233
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3234 3235 3236 3237 3238 3239
            return true;
          }
        }
      }

      return false;
3240 3241
    }
  } else {
3242 3243
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3244

3245 3246 3247 3248 3249 3250 3251
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3252
    } else {
3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3280 3281 3282 3283 3284
      }

      return false;
    }
  }
3285 3286

  return false;
3287 3288
}

3289
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3290
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3291 3292
    return NULL;
  }
H
Hongze Cheng 已提交
3293

3294
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3295
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3296
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3297
    pIter->hasVal = false;
H
Haojun Liao 已提交
3298 3299
    return NULL;
  }
H
Hongze Cheng 已提交
3300

3301
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3302
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3303
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3304 3305
    return pRow;
  }
H
Hongze Cheng 已提交
3306

3307
  while (1) {
3308 3309
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3310 3311
      return NULL;
    }
H
Hongze Cheng 已提交
3312

3313
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3314

H
Haojun Liao 已提交
3315
    key = TSDBROW_KEY(pRow);
3316
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3317
      pIter->hasVal = false;
H
Haojun Liao 已提交
3318 3319
      return NULL;
    }
H
Hongze Cheng 已提交
3320

dengyihao's avatar
dengyihao 已提交
3321
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3322
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3323 3324 3325 3326
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3327

3328 3329
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3330
  while (1) {
3331 3332
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3333 3334
      break;
    }
H
Hongze Cheng 已提交
3335

3336
    // data exists but not valid
3337
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3338 3339 3340 3341 3342
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3343
    TSDBKEY k = TSDBROW_KEY(pRow);
3344
    if (k.ts != ts) {
H
Haojun Liao 已提交
3345 3346 3347
      break;
    }

H
Haojun Liao 已提交
3348
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3349 3350 3351 3352
    if (pTSchema == NULL) {
      return terrno;
    }

3353
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3354 3355 3356 3357 3358
  }

  return TSDB_CODE_SUCCESS;
}

3359
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3360
                                          SVersionRange* pVerRange, int32_t step) {
3361
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3362
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3363
      rowIndex += step;
3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3380
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3381 3382
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3383
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3384
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3385

3386
  *state = CHECK_FILEBLOCK_QUIT;
3387
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3388

H
Haojun Liao 已提交
3389 3390
  bool loadNeighbor = true;
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3391

H
Haojun Liao 已提交
3392
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3393 3394
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3395
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3396 3397 3398 3399
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3400
  return code;
3401 3402
}

3403 3404
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3405 3406
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3407
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3408
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3409
  int32_t step = asc ? 1 : -1;
3410

3411
  pDumpInfo->rowIndex += step;
3412
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3413 3414 3415
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3416

3417 3418 3419 3420
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3421

3422
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3423
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3424 3425 3426
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3427
      }
3428
    }
H
Haojun Liao 已提交
3429
  }
3430

H
Haojun Liao 已提交
3431 3432 3433
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3434
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3435 3436
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3437 3438
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3439
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3440 3441 3442 3443 3444 3445 3446 3447 3448
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3449 3450
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3451
  TSDBROW* pNextRow = NULL;
3452
  TSDBROW  current = *pRow;
3453

3454 3455
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3456

3457 3458 3459
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3460
      return TSDB_CODE_SUCCESS;
3461
    } else {  // has next point in mem/imem
3462
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3463 3464 3465
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3466
        return TSDB_CODE_SUCCESS;
3467 3468
      }

H
Haojun Liao 已提交
3469
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3470 3471
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3472
        return TSDB_CODE_SUCCESS;
3473
      }
3474
    }
3475 3476
  }

3477 3478
  SRowMerger merge = {0};

3479
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3480
  terrno = 0;
H
Haojun Liao 已提交
3481
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3482 3483 3484
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3485

3486 3487
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3488
  }
H
Haojun Liao 已提交
3489

H
Haojun Liao 已提交
3490 3491 3492 3493
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3494 3495

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3496
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3497 3498 3499
    return terrno;
  }

H
Haojun Liao 已提交
3500 3501
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3502 3503 3504 3505 3506
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3507
  code = tRowMergerGetRow(&merge, pTSRow);
3508 3509 3510
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3511

3512
  tRowMergerClear(&merge);
3513
  *freeTSRow = true;
3514
  return TSDB_CODE_SUCCESS;
3515 3516
}

3517
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3518
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3519 3520
  SRowMerger merge = {0};

3521 3522 3523
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3524
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3525
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3526

H
Haojun Liao 已提交
3527 3528 3529 3530 3531 3532 3533 3534 3535 3536
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3537

3538
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3539 3540 3541 3542 3543 3544
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3545
  } else {
H
Haojun Liao 已提交
3546
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3547

H
Haojun Liao 已提交
3548
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3549
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3550 3551 3552 3553 3554 3555 3556 3557
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3558 3559

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3560 3561 3562 3563 3564
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3565
  }
3566

3567
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
H
Haojun Liao 已提交
3568 3569
  tRowMergerClear(&merge);

3570
  return code;
3571 3572
}

3573 3574
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3575 3576
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3577
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3578
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3579

3580 3581
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3582
  if (pBlockScanInfo->iter.hasVal) {
3583 3584 3585 3586 3587 3588
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3589
  if (pBlockScanInfo->iiter.hasVal) {
3590 3591 3592 3593 3594 3595
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3596
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3597
    TSDBKEY k = TSDBROW_KEY(pRow);
3598
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3599

3600
    int32_t code = TSDB_CODE_SUCCESS;
3601 3602
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3603
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3604
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3605
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3606
      }
3607
    } else {  // ik.ts == k.ts
3608
      *freeTSRow = true;
3609 3610 3611 3612
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3613
    }
3614

3615
    return code;
H
Haojun Liao 已提交
3616 3617
  }

3618
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3619 3620
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3621 3622
  }

3623
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3624
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3625 3626 3627 3628 3629
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3630 3631
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                             STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3632
  int32_t outputRowIndex = pBlock->info.rows;
3633
  int64_t uid = pScanInfo->uid;
3634

3635
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3636
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3637

3638
  SColVal colVal = {0};
3639
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3640

H
Haojun Liao 已提交
3641 3642
  if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3643
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3644 3645 3646
    i += 1;
  }

H
Haojun Liao 已提交
3647
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3648
    col_id_t colId = pSupInfo->colId[i];
3649 3650

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3651
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3652

H
Haojun Liao 已提交
3653
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3654
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3655 3656 3657
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3658
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3659 3660

      colDataAppendNULL(pColInfoData, outputRowIndex);
3661 3662 3663
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3664
    }
3665 3666
  }

3667
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3668
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3669
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3670
    colDataAppendNULL(pColInfoData, outputRowIndex);
3671 3672 3673
    i += 1;
  }

3674
  pBlock->info.dataLoad = 1;
3675
  pBlock->info.rows += 1;
3676
  pScanInfo->lastKey = pTSRow->ts;
3677 3678 3679
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3680 3681
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3682 3683 3684 3685
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Haojun Liao 已提交
3686 3687
  if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3688
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3689
    i += 1;
3690 3691 3692
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3693
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3694
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3695

3696
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3697
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3698
    if (pData->cid < pSupInfo->colId[i]) {
3699 3700 3701 3702
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3703 3704
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3705 3706
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3707
      j += 1;
H
Haojun Liao 已提交
3708 3709
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3710 3711 3712 3713 3714 3715 3716
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3717
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3718
    colDataAppendNULL(pCol, outputRowIndex);
3719 3720 3721
    i += 1;
  }

3722
  pResBlock->info.dataLoad = 1;
3723 3724 3725 3726
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3727 3728
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3729 3730 3731 3732
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3733
    bool    freeTSRow = false;
3734
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3735 3736
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3737 3738
    }

3739 3740
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3741 3742 3743
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3744 3745

    // no data in buffer, return immediately
3746
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3747 3748 3749
      break;
    }

3750
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3751 3752 3753 3754 3755 3756
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3757

3758 3759
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3760
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3761

3762
  STableBlockScanInfo** p = NULL;
3763
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3764
    clearBlockScanInfo(*p);
3765 3766
  }

3767 3768 3769
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3770 3771
  taosHashClear(pReader->status.pTableMap);

H
Hongze Cheng 已提交
3772 3773
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3774 3775 3776
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3777 3778
  }

H
Hongze Cheng 已提交
3779 3780 3781
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3782 3783 3784 3785 3786 3787
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3788

dengyihao's avatar
dengyihao 已提交
3789 3790 3791 3792 3793 3794
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3795

H
Hongze Cheng 已提交
3796
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3797

3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3813
// ====================================== EXPOSED APIs ======================================
3814
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3815
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3816 3817 3818 3819 3820 3821
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

H
Haojun Liao 已提交
3822 3823 3824 3825 3826 3827 3828 3829
  int32_t capacity = 0;
  if (pResBlock == NULL) {
    capacity = 4096;
  } else {
    capacity = pResBlock->info.capacity;
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3830
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3831 3832
    goto _err;
  }
H
Hongze Cheng 已提交
3833

3834
  // check for query time window
H
Haojun Liao 已提交
3835
  STsdbReader* pReader = *ppReader;
3836
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3837 3838 3839
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3840

3841 3842
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3843
    int32_t order = pCond->order;
3844
    if (order == TSDB_ORDER_ASC) {
3845
      pCond->twindows.ekey = window.skey;
3846 3847 3848
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3849
      pCond->twindows.skey = window.ekey;
3850 3851 3852 3853
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3854
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
3855
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
3856 3857 3858 3859 3860
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3861
      pCond->twindows.skey = window.ekey;
3862
      pCond->twindows.ekey = INT64_MAX;
3863
    } else {
3864
      pCond->twindows.skey = INT64_MIN;
3865
      pCond->twindows.ekey = window.ekey;
3866
    }
3867 3868
    pCond->order = order;

H
Haojun Liao 已提交
3869
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
3870 3871 3872 3873 3874
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3875
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3876 3877
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
3878
  if (pCond->suid != 0) {
3879
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3880
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3881
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3882
    }
3883 3884
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3885
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3886
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3887
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3888
    }
3889 3890
  }

3891
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
3892 3893 3894 3895
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
3896
  }
3897

3898
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
H
Haojun Liao 已提交
3899
  pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
H
Haojun Liao 已提交
3900 3901
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
3902
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3903 3904
    goto _err;
  }
H
Hongze Cheng 已提交
3905

3906
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3907
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3908 3909 3910
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3911

3912
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3913 3914
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
3915
        goto _err;
3916
      }
3917
    } else {
H
Haojun Liao 已提交
3918 3919
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3920

H
Haojun Liao 已提交
3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3933

H
Haojun Liao 已提交
3934
      code = doOpenReaderImpl(pPrevReader);
3935
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
3936
        goto _err;
3937
      }
3938 3939 3940
    }
  }

3941
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3942
  return code;
H
Hongze Cheng 已提交
3943

3944
_err:
H
Haojun Liao 已提交
3945
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
3946
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
3947
  return code;
3948
}
H
refact  
Hongze Cheng 已提交
3949 3950

void tsdbReaderClose(STsdbReader* pReader) {
3951 3952
  if (pReader == NULL) {
    return;
3953
  }
H
refact  
Hongze Cheng 已提交
3954

3955
  {
H
Haojun Liao 已提交
3956
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
3957
      STsdbReader* p = pReader->innerReader[0];
3958

3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3970 3971 3972 3973 3974 3975

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3976
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3977

3978
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
3979
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
3980 3981 3982 3983
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3984

H
Haojun Liao 已提交
3985 3986 3987
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
3988

H
Haojun Liao 已提交
3989
  taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
3990
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3991
  cleanupDataBlockIterator(&pReader->status.blockIter);
3992 3993

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
3994 3995 3996 3997 3998
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }

H
Haojun Liao 已提交
3999 4000 4001
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
4002

4003 4004 4005 4006 4007 4008 4009 4010 4011
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

H
Haojun Liao 已提交
4012
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
4013

4014
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
4015
  SIOCostSummary* pCost = &pReader->cost;
4016

H
Haojun Liao 已提交
4017 4018
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4019 4020
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4021

H
Haojun Liao 已提交
4022 4023 4024 4025 4026
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4027

H
Haojun Liao 已提交
4028 4029 4030 4031 4032 4033 4034 4035 4036 4037
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4038

4039 4040
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4041 4042 4043
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
4044
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4045 4046
}

4047
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4048
  // cleanup the data that belongs to the previous data block
4049 4050
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4051

4052
  SReaderStatus* pStatus = &pReader->status;
4053 4054 4055
  if (taosHashGetSize(pStatus->pTableMap) == 0){
    return false;
  }
H
Haojun Liao 已提交
4056

4057 4058 4059 4060 4061
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4062

4063 4064 4065
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
4066
      buildBlockFromBufferSequentially(pReader);
4067
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4068
    }
4069 4070 4071
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4072
  }
H
refact  
Hongze Cheng 已提交
4073 4074
}

4075 4076 4077 4078 4079
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

4080
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4081
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4082
    pReader->step = EXTERNAL_ROWS_PREV;
4083 4084 4085
    if (ret) {
      return ret;
    }
4086
  }
4087

4088
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4089 4090
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4091
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4092 4093 4094 4095 4096

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4097
    pReader->step = EXTERNAL_ROWS_MAIN;
4098 4099 4100 4101 4102 4103 4104
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

4105
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
4106 4107
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4108
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4109 4110 4111 4112
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4113
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
4114
    pReader->step = EXTERNAL_ROWS_NEXT;
4115 4116 4117 4118 4119 4120 4121 4122
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

H
Haojun Liao 已提交
4123 4124
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  *rows = pReader->pResBlock->info.rows;
H
Haojun Liao 已提交
4125
  *uid = pReader->pResBlock->info.id.uid;
H
Haojun Liao 已提交
4126
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
4127 4128
}

H
Haojun Liao 已提交
4129
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
4130
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4131
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
4132
      setBlockInfo(pReader, rows, uid, pWindow);
4133
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
4134
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
4135
    } else {
H
Haojun Liao 已提交
4136
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
4137 4138
    }
  } else {
H
Haojun Liao 已提交
4139
    setBlockInfo(pReader, rows, uid, pWindow);
4140 4141 4142
  }
}

4143

4144 4145
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
    SColumnDataAgg* pTsAgg) {
4146 4147 4148
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
  int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg);
4149
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4161
        taosArrayInsert(pSup->pColAgg, i ,&nullColAgg);
4162 4163 4164 4165 4166 4167
      }
      j += 1;
    }
  }
}

4168
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4169 4170
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4171
  int32_t code = 0;
4172
  *allHave = false;
H
Haojun Liao 已提交
4173
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4174

4175
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4176 4177 4178
    return TSDB_CODE_SUCCESS;
  }

4179
  // there is no statistics data for composed block
4180
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4181 4182
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4183

4184
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4185 4186
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4187 4188 4189
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4190 4191

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4192
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4193
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4194
    if (code != TSDB_CODE_SUCCESS) {
4195 4196
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4197 4198
      return code;
    }
4199
  } else {
H
Haojun Liao 已提交
4200
    *pBlockSMA = NULL;
4201
    return TSDB_CODE_SUCCESS;
4202
  }
H
Hongze Cheng 已提交
4203

4204
  *allHave = true;
H
Hongze Cheng 已提交
4205

4206 4207
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4208

4209 4210
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4211 4212 4213 4214
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4215
  size_t numOfCols = pSup->numOfCols;
4216

4217
  // ensure capacity
H
Haojun Liao 已提交
4218 4219 4220
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4221 4222
  }

4223 4224 4225 4226 4227
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
    pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
  }
4228

4229
  // do fill all null column value SMA info
4230
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4231
  size_t size = taosArrayGetSize(pSup->pColAgg);
4232

H
Haojun Liao 已提交
4233
  int32_t i = 0, j = 0;
4234 4235
  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4236 4237
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4238 4239
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4240
    } else if (pAgg->colId < pSup->colId[j]) {
4241
      i += 1;
H
Haojun Liao 已提交
4242
    } else if (pSup->colId[j] < pAgg->colId) {
H
Haojun Liao 已提交
4243
      // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
4244
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4245 4246 4247 4248
      j += 1;
    }
  }

H
Haojun Liao 已提交
4249
  *pBlockSMA = pResBlock->pBlockAgg;
4250
  pReader->cost.smaDataLoad += 1;
4251

4252
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4253
  return code;
H
Hongze Cheng 已提交
4254 4255
}

H
Haojun Liao 已提交
4256
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4257 4258 4259
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
H
Haojun Liao 已提交
4260
    return pReader->pResBlock;
4261
  }
4262

H
Haojun Liao 已提交
4263
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4264 4265
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4266 4267 4268 4269 4270 4271
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4272

4273
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4274
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4275
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4276 4277
    terrno = code;
    return NULL;
4278
  }
4279 4280

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
H
Haojun Liao 已提交
4281
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4282 4283
}

H
Haojun Liao 已提交
4284
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4296
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4297
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4298 4299
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4300

4301 4302
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4303
  pReader->order = pCond->order;
4304
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4305
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4306
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4307
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4308

4309
  // allocate buffer in order to load data blocks from file
4310
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4311

4312
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4313
  tsdbDataFReaderClose(&pReader->pFileReader);
4314

4315
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4316

4317
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4318
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4319

H
Hongze Cheng 已提交
4320
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4321
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4322

4323
  int32_t code = 0;
4324

4325 4326 4327 4328 4329 4330
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4331 4332
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4333 4334 4335
      return code;
    }
  }
H
Hongze Cheng 已提交
4336

H
Hongze Cheng 已提交
4337
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
H
Hongze Cheng 已提交
4338
            " in query %s",
H
Hongze Cheng 已提交
4339 4340
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4341

4342
  return code;
H
Hongze Cheng 已提交
4343
}
H
Hongze Cheng 已提交
4344

4345 4346 4347
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4348

4349 4350 4351 4352
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4353

4354 4355
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4356

4357 4358 4359
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4360

4361
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4362

4363
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4364

4365 4366
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4367

4368 4369
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4370

4371 4372
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4373
  }
H
Hongze Cheng 已提交
4374

4375
  pTableBlockInfo->numOfTables = numOfTables;
4376
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4377

4378 4379
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4380
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4381

4382 4383
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4384

4385 4386 4387
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4388

4389 4390 4391
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4392

4393 4394 4395
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4396

4397
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4398

4399 4400
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4401

H
Haojun Liao 已提交
4402
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4403 4404 4405 4406 4407
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4408

4409 4410
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4411
    }
H
refact  
Hongze Cheng 已提交
4412

H
Hongze Cheng 已提交
4413 4414
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4415
  }
H
Hongze Cheng 已提交
4416

H
refact  
Hongze Cheng 已提交
4417 4418
  return code;
}
H
Hongze Cheng 已提交
4419

H
refact  
Hongze Cheng 已提交
4420
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4421
  int64_t rows = 0;
H
Hongze Cheng 已提交
4422

4423 4424
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4425

4426
  while (pStatus->pTableIter != NULL) {
4427
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4428 4429 4430

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4431
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4432 4433 4434 4435 4436 4437 4438
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4439
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4440 4441 4442 4443 4444 4445 4446 4447
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4448

H
refact  
Hongze Cheng 已提交
4449
  return rows;
H
Hongze Cheng 已提交
4450
}
D
dapan1121 已提交
4451

L
Liu Jicong 已提交
4452
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4453 4454 4455 4456
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4457
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4458 4459 4460 4461 4462 4463 4464
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4465

D
dapan1121 已提交
4466
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4467
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4468
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4469
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4470 4471 4472 4473 4474 4475
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4476
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4477
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4478 4479 4480 4481
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4482 4483 4484
  }

  metaReaderClear(&mr);
4485
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4486

D
dapan1121 已提交
4487 4488
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4489

H
Haojun Liao 已提交
4490
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4519
  // fs
H
Hongze Cheng 已提交
4520 4521 4522 4523 4524
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4525 4526 4527 4528 4529 4530 4531 4532

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4533
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4534
_exit:
H
Hongze Cheng 已提交
4535 4536 4537
  return code;
}

H
Haojun Liao 已提交
4538
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4539 4540 4541 4542 4543 4544 4545 4546 4547
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4548
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4549
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4550
  }
H
Haojun Liao 已提交
4551 4552
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}