tsdbRead.c 141.6 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
  STimeWindow window;
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
87
  SColumnDataAgg** plist;
88
  int16_t*         colIds;  // column ids for loading file block data
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
91 92
} SBlockLoadSuppInfo;

93
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
94 95 96 97 98
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
99
  SSttBlockLoadInfo* pInfo;
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
103 104 105
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
106
  int32_t           order;
H
Hongze Cheng 已提交
107
  SLastBlockReader* pLastBlockReader;  // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
H
Hongze Cheng 已提交
119
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SDataBlk  block;  // current SDataBlk data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131
} SFileBlockDumpInfo;

132
typedef struct SUidOrderCheckInfo {
133 134
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
135 136
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
137
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
138 139 140
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
141
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
H
Hongze Cheng 已提交
142 143 144 145 146 147
  SUidOrderCheckInfo    uidCheckInfo;       // check all table in uid order
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
148 149
} SReaderStatus;

150
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
151 152 153
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
154 155
} SBlockInfoBuf;

H
Hongze Cheng 已提交
156
struct STsdbReader {
H
Haojun Liao 已提交
157 158 159 160 161 162 163
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
164 165
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
166
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
167
  STsdbReadSnap*     pReadSnap;
168
  SIOCostSummary     cost;
169 170
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
171 172
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
173 174 175
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
176
};
H
Hongze Cheng 已提交
177

H
Haojun Liao 已提交
178
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
179 180
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
181
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
182 183
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
184
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
185
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
186
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
187
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
188 189
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                                     STableBlockScanInfo* pInfo);
190
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
191
                                         int32_t rowIndex);
192
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
193 194
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
195

H
Hongze Cheng 已提交
196 197 198 199
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
200 201
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
202

dengyihao's avatar
dengyihao 已提交
203 204 205 206
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
207
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
208 209 210
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
211
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
212
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
H
Haojun Liao 已提交
213

214 215
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

216 217 218
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

219
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
220

221
  pSupInfo->numOfCols = numOfCols;
222
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
223
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
224 225 226
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
227 228
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
229

H
Haojun Liao 已提交
230 231
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
232
    pSupInfo->colIds[i] = pCol->info.colId;
233 234 235 236

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
237
  }
H
Hongze Cheng 已提交
238

H
Haojun Liao 已提交
239 240
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
241

242
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
243
  int32_t num = numOfTables / pBuf->numPerBucket;
244 245 246 247 248
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
249
  for (int32_t i = 0; i < num; ++i) {
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
271
  for (int32_t i = 0; i < num; ++i) {
272 273 274 275 276 277 278 279 280
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
281
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
282 283 284 285
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
286
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
287
  // allocate buffer in order to load data blocks from file
288
  // todo use simple hash instead, optimize the memory consumption
289 290 291
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
292 293 294
    return NULL;
  }

H
Haojun Liao 已提交
295
  int64_t st = taosGetTimestampUs();
296
  initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
H
Haojun Liao 已提交
297

298
  for (int32_t j = 0; j < numOfTables; ++j) {
299 300 301 302 303 304 305 306 307 308 309 310 311 312
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);

#if 0
//    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
313
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
314
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
315
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
316
    } else {
H
Haojun Liao 已提交
317
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
318
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
319
    }
wmmhello's avatar
wmmhello 已提交
320

321
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
322 323
#endif

H
Hongze Cheng 已提交
324 325
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
326 327
  }

H
Haojun Liao 已提交
328 329 330 331
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
332

333
  return pTableMap;
H
Hongze Cheng 已提交
334
}
H
Hongze Cheng 已提交
335

336 337
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
338
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
339
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
340 341 342 343 344

    pInfo->iterInit = false;
    pInfo->iiter.hasVal = false;
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
345 346
    }

347 348
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
349 350 351
  }
}

352 353 354
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
355

356 357 358
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
359

360 361 362
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
363

364 365 366 367
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
368

369
static void destroyAllBlockScanInfo(SHashObj* pTableMap, bool clearEntry) {
370
  void* p = NULL;
371
  while (clearEntry && ((p = taosHashIterate(pTableMap, p)) != NULL)) {
372
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
373 374 375 376 377
  }

  taosHashCleanup(pTableMap);
}

378
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
379 380
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
381
}
H
Hongze Cheng 已提交
382

383 384 385
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
386
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
387

388
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
389
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
390

dengyihao's avatar
dengyihao 已提交
391
  STimeWindow win = *pWindow;
392 393 394 395 396 397
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
398

H
Haojun Liao 已提交
399
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
400 401 402 403 404 405
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
406 407 408
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
409 410 411 412
  }
}

// init file iterator
413
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
414
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
415

416 417
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
418
  pIter->pFileList = aDFileSet;
419
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
420

421 422 423 424
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
425
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
426 427
      return code;
    }
428 429
  }

430 431 432 433 434 435 436 437
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

438
  if (pLReader->pInfo == NULL) {
439
    // here we ignore the first column, which is always be the primary timestamp column
440 441
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
442 443 444 445
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
446 447
  }

448
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
449 450 451
  return TSDB_CODE_SUCCESS;
}

452
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
453 454
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
455 456 457
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
458 459 460
    return false;
  }

H
Haojun Liao 已提交
461 462 463
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

464 465
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
466
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
467

H
Haojun Liao 已提交
468 469
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
470

471
  while (1) {
H
Haojun Liao 已提交
472 473 474
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
475

476
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
477

478 479 480 481
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
482

483 484
    pReader->cost.headFileLoad += 1;

485 486 487 488 489 490 491 492 493 494 495 496
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
497 498 499
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
500 501
      continue;
    }
C
Cary Xu 已提交
502

503
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
504
              pReader->window.ekey, pReader->idStr);
505 506
    return true;
  }
507

H
Haojun Liao 已提交
508
_err:
H
Haojun Liao 已提交
509 510 511
  return false;
}

512
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
513 514
  pIter->order = order;
  pIter->index = -1;
515
  pIter->numOfBlocks = 0;
516 517 518 519 520 521 522
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
523
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
524

H
Haojun Liao 已提交
525
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
526 527
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
528 529
}

530 531 532 533 534 535 536 537
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
538
    SColumnInfoData colInfo = {0, {0}};
539 540 541 542 543 544 545 546 547 548 549 550 551
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

552 553
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
554
  int32_t      code = 0;
555
  int8_t       level = 0;
H
Haojun Liao 已提交
556
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
557 558
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
559
    goto _end;
H
Hongze Cheng 已提交
560 561
  }

C
Cary Xu 已提交
562 563 564 565
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
566
  initReaderStatus(&pReader->status);
567

L
Liu Jicong 已提交
568
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
569 570
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
571
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
572 573
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
574
  pReader->type = pCond->type;
575
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
576
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
577
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
578

579
  limitOutputBufferSize(pCond, &pReader->capacity);
580

581 582
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
583
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
584
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
585
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
586 587 588
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
589

590 591
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
592
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
593 594 595 596 597
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

598 599 600 601
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
602
  }
H
Hongze Cheng 已提交
603

604 605
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
606 607
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
608

H
Haojun Liao 已提交
609
_end:
H
Haojun Liao 已提交
610
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
611 612 613
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
614

H
Haojun Liao 已提交
615
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
616
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
617

618
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
619
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
620
  if (code != TSDB_CODE_SUCCESS) {
621
    goto _end;
H
Haojun Liao 已提交
622
  }
H
Hongze Cheng 已提交
623

624 625
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
626
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
627 628
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
629

630 631 632 633
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
634
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
635

636
    // uid check
H
Hongze Cheng 已提交
637
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
638 639 640 641
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
642
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
643 644 645 646
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
647
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
648
    if (pScanInfo->pBlockList == NULL) {
649
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
650 651
    }

H
Hongze Cheng 已提交
652
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
653
  }
H
Hongze Cheng 已提交
654

655
  int64_t et2 = taosGetTimestampUs();
656
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
657
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
658 659 660

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
661
_end:
H
Hongze Cheng 已提交
662
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
663 664
  return code;
}
H
Hongze Cheng 已提交
665

666
static void cleanupTableScanInfo(SHashObj* pTableMap) {
667
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
668
  while (1) {
669
    px = taosHashIterate(pTableMap, px);
670 671 672 673
    if (px == NULL) {
      break;
    }

674
    // reset the index in last block when handing a new file
675 676
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
677
  }
678 679
}

680
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
681 682 683 684 685 686
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
687

dengyihao's avatar
dengyihao 已提交
688
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
689
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
690

H
Haojun Liao 已提交
691 692
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
693

694
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
695
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
696

697
    sizeInDisk += pScanInfo->mapData.nData;
698
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
699 700
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
701

702
      // 1. time range check
703
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
704 705
        continue;
      }
H
Hongze Cheng 已提交
706

707
      // 2. version range check
H
Hongze Cheng 已提交
708
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
709 710
        continue;
      }
711

712
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
H
Hongze Cheng 已提交
713
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
714

H
Haojun Liao 已提交
715
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
716
      if (p == NULL) {
717
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
718 719
        return TSDB_CODE_OUT_OF_MEMORY;
      }
720

721
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
722
    }
H
Hongze Cheng 已提交
723

H
Haojun Liao 已提交
724
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
725 726 727 728
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
729
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
730
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
731

732
  double el = (taosGetTimestampUs() - st) / 1000.0;
733 734 735 736 737
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
738

739
  pReader->cost.numOfBlocks += total;
740
  pReader->cost.headFileLoadTime += el;
741

H
Haojun Liao 已提交
742 743
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
744

745
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
746
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
747
  pDumpInfo->allDumped = true;
748
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
749 750
}

751 752
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
753
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
754
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
755 756 757
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
758
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
759 760 761 762
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
763
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
764
  }
H
Haojun Liao 已提交
765 766
}

767
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
768 769
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
770 771
    return NULL;
  }
772 773 774

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
775 776
}

H
Hongze Cheng 已提交
777
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
778

H
Haojun Liao 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
852
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
853 854 855 856
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
857 858
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
859 860
    while (1) {
      // check can return
H
Hongze Cheng 已提交
861 862 863
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
864 865

      // change start or end position
H
Hongze Cheng 已提交
866
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
867 868
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
869
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
870 871 872 873
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
874
  } else {  // DESC
H
Haojun Liao 已提交
875
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
876 877
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
878 879
    while (1) {
      // check can return
H
Hongze Cheng 已提交
880 881 882
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
883 884

      // change start or end position
H
Hongze Cheng 已提交
885
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
886 887
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
888
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
889 890 891 892 893 894 895 896 897 898
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
899
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
900 901 902 903 904 905

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
906 907
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
908 909 910 911 912
  }

  return endPos;
}

913
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
914
  SReaderStatus*  pStatus = &pReader->status;
915
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
916

917
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
918
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
919
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
920
  SSDataBlock*        pResBlock = pReader->pResBlock;
921
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
922

H
Haojun Liao 已提交
923
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
924
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
925

H
Haojun Liao 已提交
926
  SColVal cv = {0};
927
  int64_t st = taosGetTimestampUs();
928 929
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
930

931 932
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
933 934 935
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
936 937
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
938 939 940
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
941
    }
H
Haojun Liao 已提交
942 943 944
  }

  // time window check
945 946 947 948 949 950 951
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
952
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
953
  if (remain > pReader->capacity) {  // output buffer check
954 955 956
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
957 958
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
959
  int32_t          i = 0;
960 961
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
962
    if (asc) {
H
Haojun Liao 已提交
963
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
964 965 966 967
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
968
    }
H
Haojun Liao 已提交
969

970 971 972
    i += 1;
  }

973
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
974
  int32_t num = pBlockData->nColData;
975
  while (i < numOfOutputCols && colIndex < num) {
976 977 978
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
979
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
980 981 982
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
983
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
984 985 986 987
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
988
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
989 990 991 992

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
993
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1005
      }
H
Haojun Liao 已提交
1006

1007
      colIndex += 1;
1008
      i += 1;
1009 1010
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
1011
      i += 1;
H
Haojun Liao 已提交
1012
    }
1013 1014
  }

1015
  // fill the mis-matched columns with null value
1016
  while (i < numOfOutputCols) {
1017 1018 1019
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
1020
  }
H
Haojun Liao 已提交
1021

1022
  pResBlock->info.rows = remain;
1023
  pDumpInfo->rowIndex += step * remain;
1024

1025
  // check if current block are all handled
1026
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1027 1028 1029 1030
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1031
  } else {
1032 1033
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1034
  }
H
Haojun Liao 已提交
1035

1036
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1037
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1038

1039
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1040
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1041
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
1042 1043
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1044 1045 1046 1047

  return TSDB_CODE_SUCCESS;
}

1048 1049
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1050 1051
  int64_t st = taosGetTimestampUs();

1052 1053
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1054 1055
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1056 1057 1058 1059
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1060
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1061
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1062
  ASSERT(pBlockInfo != NULL);
1063

H
Hongze Cheng 已提交
1064
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1065
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1066 1067
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1068
              ", rows:%d, code:%s %s",
1069
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1070 1071 1072
              tstrerror(code), pReader->idStr);
    return code;
  }
1073

1074
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1075

1076
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1077
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1078 1079
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1080 1081 1082

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1083

H
Haojun Liao 已提交
1084
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1085
}
H
Hongze Cheng 已提交
1086

H
Haojun Liao 已提交
1087 1088 1089
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1090

H
Haojun Liao 已提交
1091 1092 1093 1094
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1095

H
Haojun Liao 已提交
1096 1097
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1098

H
Haojun Liao 已提交
1099 1100
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1101

H
Haojun Liao 已提交
1102
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1103 1104
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1105

H
Haojun Liao 已提交
1106 1107 1108 1109
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1110

H
Haojun Liao 已提交
1111 1112
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1113

H
Haojun Liao 已提交
1114
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1115
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1116
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1117

H
Haojun Liao 已提交
1118
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1119

H
Haojun Liao 已提交
1120 1121
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1122

H
Haojun Liao 已提交
1123 1124 1125 1126 1127 1128 1129
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1130

1131
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1132
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1133

1134 1135 1136
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1137
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1138 1139
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1140
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1141
    if (pScanInfo == NULL) {
1142
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1143 1144 1145
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1146 1147
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1148
  }
1149 1150 1151 1152 1153 1154

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1155
}
H
Hongze Cheng 已提交
1156

1157
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1158
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1159

1160
  SBlockOrderSupporter sup = {0};
1161
  pBlockIter->numOfBlocks = numOfBlocks;
1162
  taosArrayClear(pBlockIter->blockList);
1163
  pBlockIter->pTableMap = pReader->status.pTableMap;
1164

1165 1166
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1167

1168
  int64_t st = taosGetTimestampUs();
1169
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1170 1171 1172
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1173

1174 1175 1176 1177 1178 1179 1180
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1181

1182
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1183 1184 1185
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1186

1187 1188
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1189

1190 1191 1192 1193 1194
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1195

1196
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1197

1198 1199 1200
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1201
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1202 1203 1204 1205 1206
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1207

1208
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1209

1210
  // since there is only one table qualified, blocks are not sorted
1211 1212
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1213 1214
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1215
    }
1216

1217
    int64_t et = taosGetTimestampUs();
1218
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1219
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1220

1221
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1222
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1223
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1224
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1225
  }
H
Haojun Liao 已提交
1226

1227 1228
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1229

1230
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1231

1232 1233 1234 1235 1236
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1237
  }
H
Haojun Liao 已提交
1238

1239 1240 1241 1242
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1243

1244 1245
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1246

1247 1248 1249 1250
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1251

1252 1253
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1254
  }
H
Haojun Liao 已提交
1255

1256
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1257 1258
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1259 1260
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1261

1262
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1263
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1264

1265
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1266
}
H
Hongze Cheng 已提交
1267

H
Haojun Liao 已提交
1268
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1269 1270
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1271
  int32_t step = asc ? 1 : -1;
1272
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1273 1274 1275
    return false;
  }

1276
  pBlockIter->index += step;
H
Haojun Liao 已提交
1277
  doSetCurrentBlock(pBlockIter, idStr);
1278

1279 1280 1281
  return true;
}

1282 1283 1284
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1285
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1286 1287
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1288 1289
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1290
}
H
Hongze Cheng 已提交
1291

1292
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1293
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1294
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1295
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1296
    return false;
1297 1298
  }

H
Haojun Liao 已提交
1299
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1300
    return false;
1301 1302
  }

1303
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1304
  *nextIndex = pBlockInfo->tbBlockIdx + step;
H
Hongze Cheng 已提交
1305 1306
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1307
  return true;
1308 1309 1310 1311 1312
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1313
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1314 1315
  int32_t index = pBlockIter->index;

1316
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1329
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1330
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1331 1332 1333 1334
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1335 1336 1337 1338 1339
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1340

1341 1342 1343
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1344

H
Haojun Liao 已提交
1345
  doSetCurrentBlock(pBlockIter, "");
1346 1347 1348
  return TSDB_CODE_SUCCESS;
}

1349
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1350 1351
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1352
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1353
  } else {
1354
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1355
  }
H
Haojun Liao 已提交
1356
}
H
Hongze Cheng 已提交
1357

H
Hongze Cheng 已提交
1358
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1359
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1360

1361
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1362
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1363
}
H
Hongze Cheng 已提交
1364

H
Hongze Cheng 已提交
1365
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1366 1367
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1368 1369
}

H
Hongze Cheng 已提交
1370 1371
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1372 1373
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1374
  for (int32_t i = startIndex; i < num; i += 1) {
1375 1376
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1377
      if (p->version >= pBlock->minVer) {
1378 1379 1380
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1381
      if (p->version >= pBlock->minVer) {
1382 1383
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1384 1385
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1399
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1400 1401 1402 1403
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1404
  // ts is not overlap
1405
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1406
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1407 1408 1409 1410 1411
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1412
  if (ASCENDING_TRAVERSE(order)) {
1413
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1414 1415
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1416
    while (1) {
1417 1418 1419 1420 1421
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1422 1423 1424
      }
    }

1425
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1426
  }
1427 1428
}

H
Haojun Liao 已提交
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1442 1443
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1444

1445
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1446

1447
  // overlap with neighbor
1448
  if (hasNeighbor) {
1449
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1450 1451
  }

1452
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1453 1454
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1455

H
Haojun Liao 已提交
1456 1457 1458
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1459 1460
  }

H
Haojun Liao 已提交
1461 1462 1463 1464
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1465

H
Haojun Liao 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1480 1481 1482 1483

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1484 1485
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1486 1487 1488
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1489 1490 1491
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1492 1493
}

H
Haojun Liao 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1503
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1504
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1505 1506
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1507

1508 1509 1510
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1511
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1512

1513
  blockDataUpdateTsWindow(pBlock, 0);
1514
  pBlock->info.uid = pBlockScanInfo->uid;
1515

1516
  setComposedBlockFlag(pReader, true);
1517

1518
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1519
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
H
Hongze Cheng 已提交
1520
            " - %" PRId64 " %s",
1521 1522
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1523 1524

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1525 1526 1527
  return code;
}

1528 1529
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1530 1531 1532
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1533 1534
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1535
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1536 1537

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1538
    if (nextKey != key) {  // merge is not needed
1539
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1540 1541 1542 1543 1544 1545 1546 1547
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1548 1549
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1550 1551 1552 1553 1554 1555 1556 1557
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1558 1559
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1560 1561 1562 1563 1564 1565 1566
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1567
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1582 1583 1584
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1585
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1586 1587
  }

1588
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1589 1590 1591 1592 1593 1594
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1595 1596 1597 1598 1599 1600
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1601 1602 1603 1604 1605 1606
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1607
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1608
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1609
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1610 1611
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1612 1613
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1614
  }
H
Haojun Liao 已提交
1615 1616
}

1617
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1618 1619 1620 1621 1622 1623
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1624
  int64_t tsLast = INT64_MIN;
1625
  if (hasDataInLastBlock(pLastBlockReader)) {
1626 1627
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1628

H
Hongze Cheng 已提交
1629 1630
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1631

1632 1633
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1634
    minKey = INT64_MAX;  // chosen the minimum value
1635
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1636 1637
      minKey = tsLast;
    }
1638

1639 1640 1641
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1642

1643
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1644 1645 1646 1647
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1648
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1649 1650 1651 1652 1653 1654 1655
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1656
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1657 1658
      minKey = key;
    }
1659 1660 1661 1662
  }

  bool init = false;

1663
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1664
  // DESC: mem -----> imem -----> last block -----> file block
1665 1666
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1667
      init = true;
H
Haojun Liao 已提交
1668 1669 1670 1671
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1672
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1673 1674
    }

1675
    if (minKey == tsLast) {
1676
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1677 1678 1679
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1680
        init = true;
H
Haojun Liao 已提交
1681 1682 1683 1684
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1685
      }
1686
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1687
    }
1688

1689
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1690 1691 1692
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1693 1694
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1695 1696 1697 1698 1699 1700 1701 1702
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1703 1704 1705 1706 1707
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1708
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1709 1710 1711 1712 1713 1714
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1715
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1716 1717
        return code;
      }
1718 1719
    }

1720
    if (minKey == tsLast) {
1721
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1722 1723 1724
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1725
        init = true;
H
Haojun Liao 已提交
1726 1727 1728 1729
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1730
      }
1731
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1732 1733 1734
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1735 1736 1737
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1738
        init = true;
H
Haojun Liao 已提交
1739 1740 1741 1742
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1743 1744 1745
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1746 1747
  }

1748 1749 1750 1751 1752
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1753
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1754 1755 1756 1757 1758 1759

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1760 1761 1762
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1763
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1764
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1765 1766 1767

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1768
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1769
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1770

1771 1772 1773
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1774
      pBlockScanInfo->lastKey = tsLastBlock;
1775 1776
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1777 1778 1779 1780
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1781 1782 1783

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1784
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1785

H
Haojun Liao 已提交
1786
      code = tRowMergerGetRow(&merge, &pTSRow);
1787 1788 1789
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1790

1791
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1792 1793 1794 1795 1796

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1797 1798 1799 1800 1801
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1802
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1803
    ASSERT(mergeBlockData);
1804 1805

    // merge with block data if ts == key
H
Haojun Liao 已提交
1806
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1807 1808 1809
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1810
    code = tRowMergerGetRow(&merge, &pTSRow);
1811 1812 1813 1814
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1815
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1816 1817 1818 1819

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1820 1821 1822 1823

  return TSDB_CODE_SUCCESS;
}

1824 1825
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1826 1827
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1828
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1829
    // no last block available, only data block exists
1830
    if (!hasDataInLastBlock(pLastBlockReader)) {
1831 1832 1833 1834 1835 1836 1837 1838 1839
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1840
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1841 1842 1843 1844
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1845

H
Haojun Liao 已提交
1846 1847 1848 1849 1850
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1851
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1852 1853 1854 1855

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1856
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1857

H
Haojun Liao 已提交
1858
        code = tRowMergerGetRow(&merge, &pTSRow);
1859 1860 1861 1862
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1863
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1864

1865 1866
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1867
        return code;
1868
      } else {
1869 1870
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1871
      }
1872
    } else {  // desc order
1873
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1874
    }
1875
  } else {  // only last block exists
1876
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1877
  }
1878 1879
}

1880 1881
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1882 1883 1884
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1885 1886 1887
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1888 1889
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1890 1891
  ASSERT(pRow != NULL && piRow != NULL);

1892
  int64_t tsLast = INT64_MIN;
1893 1894 1895
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1896

H
Hongze Cheng 已提交
1897
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1898 1899 1900 1901

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1902
  int64_t minKey = 0;
1903 1904 1905 1906 1907
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1908

1909 1910 1911
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1912

1913
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1914 1915
      minKey = key;
    }
1916

1917
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1918 1919 1920
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1921
    minKey = INT64_MIN;  // let find the maximum ts value
1922 1923 1924 1925 1926 1927 1928 1929
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1930
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1931 1932 1933
      minKey = key;
    }

1934
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1935 1936
      minKey = tsLast;
    }
1937 1938 1939 1940
  }

  bool init = false;

1941 1942 1943 1944
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1945
      init = true;
1946
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1947
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1948 1949 1950 1951
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1952
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1953 1954
    }

1955
    if (minKey == tsLast) {
1956
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1957 1958 1959
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1960
        init = true;
1961
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1962 1963 1964
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1965
      }
H
Haojun Liao 已提交
1966

1967
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1968 1969 1970
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1971 1972 1973
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1974 1975
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1976 1977 1978 1979
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1980
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1981 1982 1983
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1984
      }
H
Haojun Liao 已提交
1985

1986 1987
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1988 1989 1990
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1991 1992
    }

1993
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1994
      if (init) {
1995 1996 1997 1998
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1999 2000
        tRowMerge(&merge, pRow);
      } else {
2001
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2002
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2003 2004 2005
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2006
      }
H
Haojun Liao 已提交
2007 2008 2009 2010 2011
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2012 2013 2014 2015
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2016
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2017
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2018 2019 2020 2021
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2022 2023 2024 2025 2026
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2027 2028 2029
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2030 2031 2032
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2033 2034
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2035
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2036 2037 2038
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2039
      }
H
Haojun Liao 已提交
2040 2041 2042 2043 2044
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2045 2046 2047
    }

    if (minKey == tsLast) {
2048
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2049 2050 2051
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2052
        init = true;
2053
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2054 2055 2056
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2057
      }
2058
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2059 2060 2061
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2062
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2063
      if (!init) {
2064
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2065 2066 2067
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2068
      } else {
2069 2070 2071
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2072
        tRowMerge(&merge, &fRow);
2073 2074
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2075 2076 2077
    }
  }

2078
  if (merge.pTSchema == NULL) {
2079 2080 2081
    return code;
  }

H
Haojun Liao 已提交
2082
  code = tRowMergerGetRow(&merge, &pTSRow);
2083 2084 2085 2086
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2087
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2088 2089 2090

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2091
  return code;
2092 2093
}

2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2119
                  "-%" PRId64 " %s",
2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2140
                  "-%" PRId64 " %s",
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2158 2159
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2160 2161 2162 2163 2164 2165 2166 2167
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2179
  TSDBKEY k = {.ts = ts, .version = ver};
2180 2181
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2182 2183 2184
    return false;
  }

2185 2186 2187
  return true;
}

2188
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2189
  // the last block reader has been initialized for this table.
2190
  if (pLBlockReader->uid == pScanInfo->uid) {
2191
    return hasDataInLastBlock(pLBlockReader);
2192 2193
  }

2194 2195
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2196 2197
  }

2198 2199
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2200

H
Hongze Cheng 已提交
2201
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2202 2203 2204
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2205
  } else {
2206
    w.ekey = pScanInfo->lastKey + step;
2207 2208
  }

2209 2210 2211
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2212 2213 2214 2215
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2216
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2217 2218
}

2219
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2220
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2221
  return TSDBROW_TS(&row);
2222 2223
}

H
Hongze Cheng 已提交
2224
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2225 2226 2227 2228

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2229 2230
  }

2231
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2232
}
2233

2234 2235
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2236 2237
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2238
    pBlockScanInfo->lastKey = key;
2239 2240
    return TSDB_CODE_SUCCESS;
  } else {
2241 2242
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2243 2244 2245
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2246 2247 2248 2249 2250
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2251
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2252
    code = tRowMergerGetRow(&merge, &pTSRow);
2253 2254 2255 2256
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2257
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2258 2259 2260 2261 2262 2263 2264

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2265 2266
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2267 2268
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2269
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2270
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2271
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2272
  } else {
2273 2274 2275 2276 2277 2278 2279 2280 2281
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2282
    // imem + file + last block
2283
    if (pBlockScanInfo->iiter.hasVal) {
2284
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2285 2286
    }

2287
    // mem + file + last block
2288
    if (pBlockScanInfo->iter.hasVal) {
2289
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2290
    }
2291

2292 2293
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2294 2295 2296
  }
}

2297
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2298 2299
  int32_t code = TSDB_CODE_SUCCESS;

2300 2301
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2302
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2303 2304
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2305
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2306
  int64_t st = taosGetTimestampUs();
2307
  int32_t step = asc ? 1 : -1;
2308 2309 2310

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2311 2312
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2313
      code = TSDB_CODE_INVALID_PARA;
2314 2315
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2316 2317 2318
      goto _end;
    }

H
Hongze Cheng 已提交
2319
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2320

H
Haojun Liao 已提交
2321
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2322
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2323 2324 2325

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
2326
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2327
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2328 2329

        // record the last key value
H
Hongze Cheng 已提交
2330
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2331 2332
        goto _end;
      }
H
Haojun Liao 已提交
2333 2334
    }
  } else {  // file blocks not exist
2335
    pBlockScanInfo = *pReader->status.pTableIter;
2336 2337
  }

2338
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2339
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2340

2341
  while (1) {
2342
    bool hasBlockData = false;
2343
    {
H
Haojun Liao 已提交
2344
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2345 2346 2347 2348 2349
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2350 2351
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2352
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2353
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2354
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2355 2356 2357
          break;
        }
      }
2358
    }
2359

2360
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2361

2362 2363 2364
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2365 2366
    }

2367
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2368

2369
    // currently loaded file data block is consumed
2370
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2371
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2372
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2373 2374 2375 2376 2377
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2378 2379 2380
    }
  }

H
Haojun Liao 已提交
2381
_end:
2382
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2383 2384
  blockDataUpdateTsWindow(pResBlock, 0);

2385
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2386
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2387 2388 2389

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2390

2391 2392
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Hongze Cheng 已提交
2393
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2394
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2395
              pResBlock->info.rows, el, pReader->idStr);
2396
  }
2397

H
Haojun Liao 已提交
2398
  return code;
2399 2400 2401 2402
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2403 2404
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2405 2406 2407
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2408

2409 2410 2411
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2412 2413
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2414
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2415 2416
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2417
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2418
    if (code != TSDB_CODE_SUCCESS) {
2419 2420 2421 2422 2423
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2424
      tsdbDelFReaderClose(&pDelFReader);
2425 2426 2427
      goto _err;
    }

H
Hongze Cheng 已提交
2428
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2429 2430 2431
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2432 2433
      goto _err;
    }
2434

2435 2436 2437
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2438
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2439
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2440 2441 2442 2443 2444 2445 2446
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2447
    }
2448
  }
2449

2450 2451 2452 2453 2454 2455 2456
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2457 2458
  }

2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2473
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2474
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2475 2476
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2477
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2478 2479
  return code;

H
Haojun Liao 已提交
2480
_err:
2481 2482
  taosArrayDestroy(pDelData);
  return code;
2483 2484
}

H
Haojun Liao 已提交
2485
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2486
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2487
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2488
  if (pRow != NULL) {
2489 2490 2491
    key = TSDBROW_KEY(pRow);
  }

2492
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2493
  if (pRow != NULL) {
2494 2495 2496 2497 2498 2499 2500 2501 2502
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2503
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2504
  SReaderStatus* pStatus = &pReader->status;
2505
  pBlockNum->numOfBlocks = 0;
2506
  pBlockNum->numOfLastFiles = 0;
2507

2508
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2509
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2510 2511

  while (1) {
2512
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2513
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2514 2515 2516
      break;
    }

H
Haojun Liao 已提交
2517
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2518 2519
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2520
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2521 2522 2523
      return code;
    }

H
Hongze Cheng 已提交
2524
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2525
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2526
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2527
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2528 2529 2530
        return code;
      }

2531
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2532 2533 2534
        break;
      }
    }
2535

H
Haojun Liao 已提交
2536 2537 2538
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2539
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2540 2541 2542
  return TSDB_CODE_SUCCESS;
}

2543
static int32_t uidComparFunc(const void* p1, const void* p2) {
2544 2545
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2546 2547 2548
  if (pu1 == pu2) {
    return 0;
  } else {
2549
    return (pu1 < pu2) ? -1 : 1;
2550 2551
  }
}
2552

2553
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2554 2555 2556 2557
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2558
  while (p != NULL) {
H
Hongze Cheng 已提交
2559
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
2560 2561 2562 2563 2564 2565 2566
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2567
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2568 2569 2570 2571
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2572

2573
  if (pOrderCheckInfo->tableUidList == NULL) {
2574 2575 2576 2577 2578 2579
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2580
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2581 2582 2583
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2584 2585
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2586 2587
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2588 2589

      // the tableMap has already updated
2590
      if (pStatus->pTableIter == NULL) {
2591
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2592 2593 2594 2595 2596 2597 2598 2599 2600
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2601
      }
2602
    }
2603
  }
2604

2605 2606 2607
  return TSDB_CODE_SUCCESS;
}

2608
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2621
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2622
  SReaderStatus*    pStatus = &pReader->status;
2623 2624
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2625 2626
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2627
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2628 2629
    return code;
  }
2630

2631
  while (1) {
2632
    // load the last data block of current table
H
Hongze Cheng 已提交
2633
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
H
Hongze Cheng 已提交
2634
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2635
    if (!hasVal) {
2636 2637
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2638 2639 2640
        return TSDB_CODE_SUCCESS;
      }
      continue;
2641 2642
    }

2643 2644 2645 2646 2647 2648 2649 2650
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2651

2652
    // current table is exhausted, let's try next table
2653 2654
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2655 2656
      return TSDB_CODE_SUCCESS;
    }
2657 2658 2659
  }
}

2660
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2661 2662
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2663 2664 2665

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2666 2667 2668
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2669

2670
  if (pBlockInfo != NULL) {
H
Hongze Cheng 已提交
2671 2672
    pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2673
  } else {
2674
    pScanInfo = *pReader->status.pTableIter;
2675 2676
  }

H
Haojun Liao 已提交
2677
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2678
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2679 2680 2681 2682
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2683
  if (pBlockInfo != NULL) {
2684
    pBlock = getCurrentBlock(pBlockIter);
2685 2686
  }

2687
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2688
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2689

2690 2691 2692
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2693
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2694
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2695 2696
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2697 2698 2699
    }

    // build composed data block
2700
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2701
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2702
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2703
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2704
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2705
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2706 2707 2708 2709
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2710
      ASSERT(tsLast >= pBlock->maxKey.ts);
2711 2712
      tBlockDataReset(&pReader->status.fileBlockData);

2713
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2714
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2715
    } else {  // whole block is required, return it directly
2716 2717 2718 2719 2720 2721
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2722

2723
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
2724
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
2725
    }
2726 2727 2728 2729 2730
  }

  return code;
}

H
Haojun Liao 已提交
2731
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2732 2733
  SReaderStatus* pStatus = &pReader->status;

2734
  while (1) {
2735 2736 2737
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2738
        return TSDB_CODE_SUCCESS;
2739 2740 2741
      }
    }

2742 2743
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2744

2745
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2746
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2747 2748 2749 2750
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2751
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2752
      return TSDB_CODE_SUCCESS;
2753 2754 2755 2756 2757
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2758
      return TSDB_CODE_SUCCESS;
2759 2760 2761 2762
    }
  }
}

2763
// set the correct start position in case of the first/last file block, according to the query time window
2764
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2765
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2766

2767 2768 2769
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2770 2771 2772

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2773
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2774 2775
}

2776
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2777 2778
  SBlockNumber num = {0};

2779
  int32_t code = moveToNextFile(pReader, &num);
2780 2781 2782 2783 2784
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2785
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2786 2787 2788 2789 2790
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2791 2792
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2793
  } else {  // no block data, only last block exists
2794
    tBlockDataReset(&pReader->status.fileBlockData);
2795
    resetDataBlockIterator(pBlockIter, pReader->order);
2796
  }
2797 2798

  // set the correct start position according to the query time window
2799
  initBlockDumpInfo(pReader, pBlockIter);
2800 2801 2802
  return code;
}

2803
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2804 2805
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2806 2807
}

2808
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2809
  int32_t code = TSDB_CODE_SUCCESS;
2810 2811
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2812 2813
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2814
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2815
  _begin:
2816 2817 2818 2819 2820
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2821 2822 2823 2824
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2825
    // all data blocks are checked in this last block file, now let's try the next file
2826 2827 2828 2829 2830 2831 2832 2833
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2834
      // this file does not have data files, let's start check the last block file if exists
2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2850
  while (1) {
2851 2852
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2853
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2854
      code = buildComposedDataBlock(pReader);
2855 2856 2857 2858
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2859
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2860 2861
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2862
        } else {
H
Haojun Liao 已提交
2863
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2864 2865 2866 2867 2868 2869
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2870

2871 2872 2873 2874
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2875

2876 2877 2878 2879
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2880
          }
2881
        }
H
Haojun Liao 已提交
2882
      }
2883 2884

      code = doBuildDataBlock(pReader);
2885 2886
    }

2887 2888 2889 2890 2891 2892 2893 2894
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2895
}
H
refact  
Hongze Cheng 已提交
2896

2897 2898
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2899
  if (VND_IS_RSMA(pVnode)) {
2900
    int8_t  level = 0;
2901 2902
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2903
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
H
Hongze Cheng 已提交
2904 2905
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
2906

2907
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2908 2909 2910 2911 2912 2913 2914
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2915
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2916 2917 2918 2919 2920
        break;
      }
      ++level;
    }

2921
    const char* str = (idStr != NULL) ? idStr : "";
2922 2923

    if (level == TSDB_RETENTION_L0) {
2924
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2925
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2926 2927
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2928
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2929
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2930 2931
      return VND_RSMA1(pVnode);
    } else {
2932
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2933
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2934 2935 2936 2937 2938 2939 2940
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2941
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2942
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2943 2944

  int64_t endVer = 0;
L
Liu Jicong 已提交
2945 2946
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2947 2948
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2949
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2950 2951
  }

H
Haojun Liao 已提交
2952
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2953 2954
}

2955
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2956 2957 2958 2959
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2960 2961 2962
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2963

2964 2965 2966 2967 2968 2969
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2970
        return false;
2971 2972 2973
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2974 2975
      }
    } else {
2976 2977 2978 2979 2980 2981 2982
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2983 2984
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3000 3001
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3002 3003 3004 3005 3006 3007
            return true;
          }
        }
      }

      return false;
3008 3009
    }
  } else {
3010 3011
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3012

3013 3014 3015 3016 3017 3018 3019
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3020
    } else {
3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3048 3049 3050 3051 3052
      }

      return false;
    }
  }
3053 3054

  return false;
3055 3056
}

3057
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3058
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3059 3060
    return NULL;
  }
H
Hongze Cheng 已提交
3061

3062
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3063
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3064
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3065
    pIter->hasVal = false;
H
Haojun Liao 已提交
3066 3067
    return NULL;
  }
H
Hongze Cheng 已提交
3068

3069
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3070
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3071
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3072 3073
    return pRow;
  }
H
Hongze Cheng 已提交
3074

3075
  while (1) {
3076 3077
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3078 3079
      return NULL;
    }
H
Hongze Cheng 已提交
3080

3081
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3082

H
Haojun Liao 已提交
3083
    key = TSDBROW_KEY(pRow);
3084
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3085
      pIter->hasVal = false;
H
Haojun Liao 已提交
3086 3087
      return NULL;
    }
H
Hongze Cheng 已提交
3088

dengyihao's avatar
dengyihao 已提交
3089
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3090
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3091 3092 3093 3094
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3095

3096 3097
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3098
  while (1) {
3099 3100
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3101 3102
      break;
    }
H
Hongze Cheng 已提交
3103

3104
    // data exists but not valid
3105
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3106 3107 3108 3109 3110
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3111
    TSDBKEY k = TSDBROW_KEY(pRow);
3112
    if (k.ts != ts) {
H
Haojun Liao 已提交
3113 3114 3115
      break;
    }

H
Haojun Liao 已提交
3116
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3117 3118 3119 3120
    if (pTSchema == NULL) {
      return terrno;
    }

3121
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3122 3123 3124 3125 3126
  }

  return TSDB_CODE_SUCCESS;
}

3127
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3128
                                          SVersionRange* pVerRange, int32_t step) {
3129 3130
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3131
      rowIndex += step;
3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3148
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3149 3150
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3151
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3152
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3153

3154
  *state = CHECK_FILEBLOCK_QUIT;
3155
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3156

3157 3158 3159 3160
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3161
  if (!hasNeighbor) {  // do nothing
3162 3163 3164
    return 0;
  }

3165
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3166
  if (overlap) {  // load next block
3167
    SReaderStatus*  pStatus = &pReader->status;
3168 3169
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3170
    // 1. find the next neighbor block in the scan block list
3171
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3172
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3173

3174
    // 2. remove it from the scan block list
3175
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3176

3177
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3178
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3179 3180 3181 3182
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3183
    // 4. check the data values
3184 3185 3186 3187
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3188
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3189 3190 3191 3192 3193 3194 3195
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3196 3197
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3198 3199
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3200
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3201
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3202
  int32_t step = asc ? 1 : -1;
3203

3204
  pDumpInfo->rowIndex += step;
3205
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3206 3207 3208
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3209

3210 3211 3212 3213
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3214

3215
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3216
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3217 3218 3219
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3220
      }
3221
    }
H
Haojun Liao 已提交
3222
  }
3223

H
Haojun Liao 已提交
3224 3225 3226
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3227
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3228 3229
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3230 3231
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3232
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3233 3234 3235 3236 3237 3238 3239 3240 3241
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3242 3243
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3244
  TSDBROW* pNextRow = NULL;
3245
  TSDBROW  current = *pRow;
3246

3247 3248
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3249

3250 3251 3252
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3253
      return TSDB_CODE_SUCCESS;
3254
    } else {  // has next point in mem/imem
3255
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3256 3257 3258
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3259
        return TSDB_CODE_SUCCESS;
3260 3261
      }

H
Haojun Liao 已提交
3262
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3263 3264
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3265
        return TSDB_CODE_SUCCESS;
3266
      }
3267
    }
3268 3269
  }

3270 3271
  SRowMerger merge = {0};

3272
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3273
  terrno = 0;
H
Haojun Liao 已提交
3274
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3275 3276 3277
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3278

3279 3280
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3281
  }
H
Haojun Liao 已提交
3282

H
Haojun Liao 已提交
3283 3284 3285 3286
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3287 3288

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3289
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3290 3291 3292
    return terrno;
  }

H
Haojun Liao 已提交
3293 3294
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3295 3296 3297 3298 3299
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3300
  code = tRowMergerGetRow(&merge, pTSRow);
3301 3302 3303
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3304

3305
  tRowMergerClear(&merge);
3306
  *freeTSRow = true;
3307
  return TSDB_CODE_SUCCESS;
3308 3309
}

3310
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3311
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3312 3313
  SRowMerger merge = {0};

3314 3315 3316
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3317
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3318
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3319

H
Haojun Liao 已提交
3320 3321 3322 3323 3324 3325 3326 3327 3328 3329
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3330

3331
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3332 3333 3334 3335 3336 3337
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3338
  } else {
H
Haojun Liao 已提交
3339
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3340

H
Haojun Liao 已提交
3341
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3342
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3343 3344 3345 3346 3347 3348 3349 3350
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3351 3352

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3353 3354 3355 3356 3357
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3358
  }
3359

3360 3361
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3362 3363
}

3364 3365
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3366 3367
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3368
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3369
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3370

3371 3372
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3373
  if (pBlockScanInfo->iter.hasVal) {
3374 3375 3376 3377 3378 3379
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3380
  if (pBlockScanInfo->iiter.hasVal) {
3381 3382 3383 3384 3385 3386
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3387
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3388
    TSDBKEY k = TSDBROW_KEY(pRow);
3389
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3390

3391
    int32_t code = TSDB_CODE_SUCCESS;
3392 3393
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3394
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3395
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3396
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3397
      }
3398
    } else {  // ik.ts == k.ts
3399
      *freeTSRow = true;
3400 3401 3402 3403
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3404
    }
3405

3406
    return code;
H
Haojun Liao 已提交
3407 3408
  }

3409
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3410 3411
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3412 3413
  }

3414
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3415
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3416 3417 3418 3419 3420
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3421 3422
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                             STableBlockScanInfo* pScanInfo) {
3423 3424
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
3425
  int64_t uid = pScanInfo->uid;
3426

3427
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3428
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3429

3430
  SColVal colVal = {0};
3431
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3432

3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3444
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3445 3446 3447 3448 3449 3450 3451 3452
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3453
    }
3454 3455
  }

3456
  // set null value since current column does not exist in the "pSchema"
3457
  while (i < numOfCols) {
3458 3459 3460 3461 3462
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3463
  pBlock->info.rows += 1;
3464
  pScanInfo->lastKey = pTSRow->ts;
3465 3466 3467
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3468 3469
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3470 3471 3472 3473 3474 3475 3476 3477
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3478
    i += 1;
3479 3480 3481
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3482
  int32_t numOfInputCols = pBlockData->nColData;
3483
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3484

3485
  while (i < numOfOutputCols && j < numOfInputCols) {
3486
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3487
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3488

3489 3490 3491 3492 3493
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3494
    if (pData->cid == pCol->info.colId) {
3495 3496
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3497
      j += 1;
H
Haojun Liao 已提交
3498 3499
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3500 3501 3502 3503 3504 3505 3506 3507
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3508
    colDataAppendNULL(pCol, outputRowIndex);
3509 3510 3511 3512 3513 3514 3515
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3516 3517
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3518 3519 3520 3521
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3522
    bool    freeTSRow = false;
3523
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3524 3525
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3526 3527
    }

3528 3529
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3530 3531 3532
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3533 3534

    // no data in buffer, return immediately
3535
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3536 3537 3538
      break;
    }

3539
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3540 3541 3542 3543
      break;
    }
  } while (1);

3544
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3545 3546
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3547

3548 3549
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3550
  ASSERT(pReader != NULL);
3551
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3552

3553
  STableBlockScanInfo** p = NULL;
3554
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3555
    clearBlockScanInfo(*p);
3556 3557
  }

3558 3559 3560
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3561 3562
  taosHashClear(pReader->status.pTableMap);

H
Hongze Cheng 已提交
3563 3564
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3565 3566 3567
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3568 3569
  }

H
Hongze Cheng 已提交
3570 3571 3572
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3573 3574 3575 3576 3577 3578
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3579

dengyihao's avatar
dengyihao 已提交
3580 3581 3582 3583 3584 3585
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3586

H
Hongze Cheng 已提交
3587
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3588

3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3604
// ====================================== EXPOSED APIs ======================================
3605 3606
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3607 3608 3609 3610 3611 3612
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3613 3614
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3615 3616
    goto _err;
  }
H
Hongze Cheng 已提交
3617

3618
  // check for query time window
H
Haojun Liao 已提交
3619
  STsdbReader* pReader = *ppReader;
3620
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3621 3622 3623
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3624

3625 3626
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3627
    int32_t order = pCond->order;
3628
    if (order == TSDB_ORDER_ASC) {
3629
      pCond->twindows.ekey = window.skey;
3630 3631 3632
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3633
      pCond->twindows.skey = window.ekey;
3634 3635 3636 3637
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3638
    // here we only need one more row, so the capacity is set to be ONE.
3639 3640 3641 3642 3643 3644
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3645
      pCond->twindows.skey = window.ekey;
3646
      pCond->twindows.ekey = INT64_MAX;
3647
    } else {
3648
      pCond->twindows.skey = INT64_MIN;
3649
      pCond->twindows.ekey = window.ekey;
3650
    }
3651 3652
    pCond->order = order;

3653 3654 3655 3656 3657 3658
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3659
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3660
  if (pCond->suid != 0) {
3661
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3662
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3663
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3664
    }
3665 3666
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3667
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3668
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3669
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3670
    }
3671 3672
  }

H
Hongze Cheng 已提交
3673
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3674

3675
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3676 3677 3678
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3679

H
Haojun Liao 已提交
3680 3681 3682
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3683

3684
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3685
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3686 3687 3688
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3689

3690
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3691 3692 3693
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3694
      }
3695
    } else {
H
Haojun Liao 已提交
3696 3697
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3698

H
Haojun Liao 已提交
3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3711

H
Haojun Liao 已提交
3712
      code = doOpenReaderImpl(pPrevReader);
3713
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3714
        return code;
3715
      }
3716 3717 3718
    }
  }

3719
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3720
  return code;
H
Hongze Cheng 已提交
3721

H
Hongze Cheng 已提交
3722
_err:
H
Haojun Liao 已提交
3723
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3724
  return code;
H
refact  
Hongze Cheng 已提交
3725 3726 3727
}

void tsdbReaderClose(STsdbReader* pReader) {
3728 3729
  if (pReader == NULL) {
    return;
3730
  }
H
refact  
Hongze Cheng 已提交
3731

3732 3733
  {
    if (pReader->innerReader[0] != NULL) {
3734
      STsdbReader* p = pReader->innerReader[0];
3735

3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3747 3748 3749 3750 3751 3752

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3753
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3754

3755 3756 3757 3758
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3759
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3760 3761 3762 3763
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3764

3765
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3766
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3767 3768

  cleanupDataBlockIterator(&pReader->status.blockIter);
3769 3770

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3771
  destroyAllBlockScanInfo(pReader->status.pTableMap, (pReader->innerReader[0] == NULL) ? true : false);
3772
  blockDataDestroy(pReader->pResBlock);
3773
  clearBlockScanInfoBuf(&pReader->blockInfoBuf);
3774

H
Haojun Liao 已提交
3775 3776 3777
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3778

H
Haojun Liao 已提交
3779
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3780

3781
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3782
  SIOCostSummary* pCost = &pReader->cost;
3783

H
Haojun Liao 已提交
3784 3785
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3786 3787
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3788

H
Haojun Liao 已提交
3789 3790 3791 3792 3793
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3794

H
Haojun Liao 已提交
3795 3796 3797 3798 3799 3800 3801 3802 3803 3804
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3805

3806 3807
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3808 3809 3810
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3811
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3812 3813
}

3814
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3815
  // cleanup the data that belongs to the previous data block
3816 3817
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3818

3819
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3820

3821 3822 3823 3824 3825
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3826

3827 3828 3829
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3830
      buildBlockFromBufferSequentially(pReader);
3831
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3832
    }
3833 3834 3835
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3836
  }
3837

3838
  return false;
H
refact  
Hongze Cheng 已提交
3839 3840
}

3841 3842 3843 3844 3845
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3846
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3847
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3848
    pReader->step = EXTERNAL_ROWS_PREV;
3849 3850 3851
    if (ret) {
      return ret;
    }
3852
  }
3853

3854
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3855 3856
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3857
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3858 3859 3860 3861 3862

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3863
    pReader->step = EXTERNAL_ROWS_MAIN;
3864 3865 3866 3867 3868 3869 3870
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3871
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3872 3873
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3874
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3875 3876 3877 3878
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3879
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3880
    pReader->step = EXTERNAL_ROWS_NEXT;
3881 3882 3883 3884 3885 3886 3887 3888
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3889
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Hongze Cheng 已提交
3890 3891
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3892
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3893 3894 3895
    return false;
  }

H
Haojun Liao 已提交
3896
  return true;
3897 3898
}

H
Haojun Liao 已提交
3899 3900 3901 3902 3903
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3904 3905
}

H
Haojun Liao 已提交
3906
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3907
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3908
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3909
      setBlockInfo(pReader, rows, uid, pWindow);
3910
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3911
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3912
    } else {
H
Haojun Liao 已提交
3913
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3914 3915
    }
  } else {
H
Haojun Liao 已提交
3916
    setBlockInfo(pReader, rows, uid, pWindow);
3917 3918 3919
  }
}

3920
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3921
  int32_t code = 0;
3922
  *allHave = false;
H
Hongze Cheng 已提交
3923

3924
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3925 3926 3927 3928
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3929
  // there is no statistics data for composed block
3930 3931 3932 3933
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3934

3935
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3936

H
Hongze Cheng 已提交
3937
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3938
  //  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3939

3940 3941
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3942
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3943
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3944
    if (code != TSDB_CODE_SUCCESS) {
3945 3946
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3947 3948
      return code;
    }
3949 3950 3951
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3952
  }
H
Hongze Cheng 已提交
3953

3954
  *allHave = true;
H
Hongze Cheng 已提交
3955

3956 3957
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3958

3959 3960
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3961 3962 3963 3964 3965 3966 3967 3968
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
H
Hongze Cheng 已提交
3969
  size_t  size = taosArrayGetSize(pSup->pColAgg);
3970 3971

  while (j < numOfCols && i < size) {
3972 3973 3974 3975 3976 3977 3978
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3979 3980
      i += 1;
      j += 1;
3981 3982 3983 3984 3985 3986 3987
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3988
  pReader->cost.smaDataLoad += 1;
3989 3990
  *pBlockStatis = pSup->plist;

3991
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
3992

H
Hongze Cheng 已提交
3993
  return code;
H
Hongze Cheng 已提交
3994 3995
}

3996
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3997 3998 3999
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
4000
    return pReader->pResBlock->pDataBlock;
4001
  }
4002

H
Haojun Liao 已提交
4003
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4004 4005
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4006 4007 4008 4009 4010 4011
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4012

4013
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4014
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4015
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4016 4017
    terrno = code;
    return NULL;
4018
  }
4019 4020 4021

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4022 4023
}

4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4036
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4037
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4038 4039
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4040

4041 4042
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4043
  pReader->order = pCond->order;
4044
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4045
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4046
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4047
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4048

4049
  // allocate buffer in order to load data blocks from file
4050
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4051 4052
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4053
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4054
  tsdbDataFReaderClose(&pReader->pFileReader);
4055

4056
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4057

4058
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4059
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4060

H
Hongze Cheng 已提交
4061
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4062
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4063

4064
  int32_t code = 0;
4065

4066 4067 4068 4069 4070 4071
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4072 4073
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4074 4075 4076
      return code;
    }
  }
H
Hongze Cheng 已提交
4077

H
Hongze Cheng 已提交
4078
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
H
Hongze Cheng 已提交
4079
            " in query %s",
H
Hongze Cheng 已提交
4080 4081
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4082

4083
  return code;
H
Hongze Cheng 已提交
4084
}
H
Hongze Cheng 已提交
4085

4086 4087 4088
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4089

4090 4091 4092 4093
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4094

4095 4096
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4097

4098 4099 4100
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4101

4102
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4103

4104
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4105

4106 4107
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4108

4109 4110
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4111

4112 4113
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4114
  }
H
Hongze Cheng 已提交
4115

4116
  pTableBlockInfo->numOfTables = numOfTables;
4117
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4118

4119 4120
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4121
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4122

4123 4124
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4125

4126 4127 4128
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4129

4130 4131 4132
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4133

4134 4135 4136
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4137

4138 4139
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4140

H
Haojun Liao 已提交
4141
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4142 4143 4144 4145 4146
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4147

4148 4149
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4150
    }
H
refact  
Hongze Cheng 已提交
4151

H
Hongze Cheng 已提交
4152 4153
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4154
  }
H
Hongze Cheng 已提交
4155

H
refact  
Hongze Cheng 已提交
4156 4157
  return code;
}
H
Hongze Cheng 已提交
4158

H
refact  
Hongze Cheng 已提交
4159
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4160
  int64_t rows = 0;
H
Hongze Cheng 已提交
4161

4162 4163
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4164

4165
  while (pStatus->pTableIter != NULL) {
4166
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4167 4168 4169

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4170
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4171 4172 4173 4174 4175 4176 4177
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4178
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4179 4180 4181 4182 4183 4184 4185 4186
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4187

H
refact  
Hongze Cheng 已提交
4188
  return rows;
H
Hongze Cheng 已提交
4189
}
D
dapan1121 已提交
4190

L
Liu Jicong 已提交
4191
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4204

D
dapan1121 已提交
4205
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4206
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4221
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4222

D
dapan1121 已提交
4223 4224
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4225

H
Haojun Liao 已提交
4226
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4255
  // fs
H
Hongze Cheng 已提交
4256 4257 4258 4259 4260
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4261 4262 4263 4264 4265 4266 4267 4268

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4269
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4270
_exit:
H
Hongze Cheng 已提交
4271 4272 4273
  return code;
}

H
Haojun Liao 已提交
4274
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4275 4276 4277 4278 4279 4280 4281 4282 4283
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4284
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4285
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4286
  }
H
Haojun Liao 已提交
4287 4288
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}