tsdbRead.c 141.2 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
  STimeWindow window;
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
87
  SColumnDataAgg** plist;
88
  int16_t*         colIds;  // column ids for loading file block data
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
91 92
} SBlockLoadSuppInfo;

93
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
94 95 96 97 98
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
99
  SSttBlockLoadInfo* pInfo;
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
103 104 105
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
106
  int32_t           order;
H
Hongze Cheng 已提交
107
  SLastBlockReader* pLastBlockReader;  // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
H
Hongze Cheng 已提交
119
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SDataBlk  block;  // current SDataBlk data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131
} SFileBlockDumpInfo;

132
typedef struct SUidOrderCheckInfo {
133 134
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
135 136
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
137
typedef struct SReaderStatus {
138
  bool                 loadFromFile;       // check file stage
139
  bool                 composedDataBlock;  // the returned data block is a composed block or not
140
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
141
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
142
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
143
  SFileBlockDumpInfo   fBlockDumpInfo;
144 145 146 147
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
148 149
} SReaderStatus;

150 151 152 153 154 155
typedef struct SBlockInfoBuf {
  int32_t  currentIndex;
  SArray*  pData;
  int32_t  numPerBucket;
} SBlockInfoBuf;

H
Hongze Cheng 已提交
156
struct STsdbReader {
H
Haojun Liao 已提交
157 158 159 160 161 162 163
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
164 165
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
166
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
167
  STsdbReadSnap*     pReadSnap;
168
  SIOCostSummary     cost;
169 170
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
171 172
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
173 174 175
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
176
};
H
Hongze Cheng 已提交
177

H
Haojun Liao 已提交
178
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
179 180
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
181
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
182 183
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
184
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
185
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
186
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
187
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
188
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
189
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
190
                                         int32_t rowIndex);
191
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
192
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
193

H
Hongze Cheng 已提交
194 195 196 197
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
198 199
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
200

dengyihao's avatar
dengyihao 已提交
201 202 203 204
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
205
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
206 207 208
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
209
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
210 211
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
212

213 214
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

215 216 217
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

218
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
219

220
  pSupInfo->numOfCols = numOfCols;
221
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
222
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
223 224 225
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
226 227
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
228

H
Haojun Liao 已提交
229 230
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
231
    pSupInfo->colIds[i] = pCol->info.colId;
232 233 234 235

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
236
  }
H
Hongze Cheng 已提交
237

H
Haojun Liao 已提交
238 239
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
240

241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  int32_t num =  numOfTables / pBuf->numPerBucket;
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for(int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
  for(int32_t i = 0; i < num; ++i) {
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
  char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
285
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
286
  // allocate buffer in order to load data blocks from file
287
  // todo use simple hash instead, optimize the memory consumption
288 289 290
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
291 292 293
    return NULL;
  }

H
Haojun Liao 已提交
294
  int64_t st = taosGetTimestampUs();
295
  initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
H
Haojun Liao 已提交
296

297
  for (int32_t j = 0; j < numOfTables; ++j) {
298 299 300 301 302 303 304 305 306 307 308 309 310 311
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);

#if 0
//    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
312
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
313
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
314
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
315
    } else {
H
Haojun Liao 已提交
316
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
317
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
318
    }
wmmhello's avatar
wmmhello 已提交
319

320
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
321 322 323
#endif

    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
324
              pTsdbReader->idStr);
H
Haojun Liao 已提交
325 326
  }

H
Haojun Liao 已提交
327 328 329 330
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
331

332
  return pTableMap;
H
Hongze Cheng 已提交
333
}
H
Hongze Cheng 已提交
334

335 336
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
337
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
338 339 340 341 342 343
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;

    pInfo->iterInit = false;
    pInfo->iiter.hasVal = false;
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
344 345
    }

346 347
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
348 349 350
  }
}

351 352 353
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
354

355 356 357
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
358

359 360 361
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
362

363 364 365 366
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
367

368
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
369
  void* p = NULL;
370
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
371
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
372 373 374 375 376
  }

  taosHashCleanup(pTableMap);
}

377
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
378 379
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
380
}
H
Hongze Cheng 已提交
381

382 383 384
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
385
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
386

387
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
388
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
389

dengyihao's avatar
dengyihao 已提交
390
  STimeWindow win = *pWindow;
391 392 393 394 395 396
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
397

H
Haojun Liao 已提交
398
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
399 400 401 402 403 404
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
405 406 407
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
408 409 410 411
  }
}

// init file iterator
412
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
413
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
414

415 416
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
417
  pIter->pFileList = aDFileSet;
418
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
419

420 421 422 423
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
424
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
425 426
      return code;
    }
427 428
  }

429 430 431 432 433 434 435 436
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

437
  if (pLReader->pInfo == NULL) {
438
    // here we ignore the first column, which is always be the primary timestamp column
439 440
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
441 442 443 444
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
445 446
  }

447
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
448 449 450
  return TSDB_CODE_SUCCESS;
}

451
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
452 453
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
454 455 456
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
457 458 459
    return false;
  }

H
Haojun Liao 已提交
460 461 462
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

463 464
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
465
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
466

H
Haojun Liao 已提交
467 468
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
469

470
  while (1) {
H
Haojun Liao 已提交
471 472 473
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
474

475
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
476

477 478 479 480
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
481

482 483
    pReader->cost.headFileLoad += 1;

484 485 486 487 488 489 490 491 492 493 494 495
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
496 497 498
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
499 500
      continue;
    }
C
Cary Xu 已提交
501

502
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
503
              pReader->window.ekey, pReader->idStr);
504 505
    return true;
  }
506

H
Haojun Liao 已提交
507
_err:
H
Haojun Liao 已提交
508 509 510
  return false;
}

511
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
512 513
  pIter->order = order;
  pIter->index = -1;
514
  pIter->numOfBlocks = 0;
515 516 517 518 519 520 521
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
522
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
523

H
Haojun Liao 已提交
524
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
525 526
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
527 528
}

529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

552 553
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
554
  int32_t      code = 0;
555
  int8_t       level = 0;
H
Haojun Liao 已提交
556
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
557 558
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
559
    goto _end;
H
Hongze Cheng 已提交
560 561
  }

C
Cary Xu 已提交
562 563 564 565
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
566
  initReaderStatus(&pReader->status);
567

L
Liu Jicong 已提交
568
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
569 570
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
571
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
572 573
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
574
  pReader->type = pCond->type;
575
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
576
  pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
577
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
578

579
  limitOutputBufferSize(pCond, &pReader->capacity);
580

581 582
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
583
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
584
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
585
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
586 587 588
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
589

590 591
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
592
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
593 594 595 596 597
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

598 599 600 601
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
602
  }
H
Hongze Cheng 已提交
603

604 605
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
606 607
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
608

H
Haojun Liao 已提交
609
_end:
H
Haojun Liao 已提交
610
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
611 612 613
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
614

H
Haojun Liao 已提交
615
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
616
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
617

618
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
619
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
620
  if (code != TSDB_CODE_SUCCESS) {
621
    goto _end;
H
Haojun Liao 已提交
622
  }
H
Hongze Cheng 已提交
623

624 625
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
626
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
627 628
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
629

630 631 632 633
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
634
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
635

636
    // uid check
H
Hongze Cheng 已提交
637
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
638 639 640 641
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
642
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
643 644 645 646
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
647
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
648
    if (pScanInfo->pBlockList == NULL) {
649
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
650 651
    }

H
Hongze Cheng 已提交
652
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
653
  }
H
Hongze Cheng 已提交
654

655
  int64_t et2 = taosGetTimestampUs();
656
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
657
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
658 659 660

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
661
_end:
H
Hongze Cheng 已提交
662
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
663 664
  return code;
}
H
Hongze Cheng 已提交
665

666
static void cleanupTableScanInfo(SHashObj* pTableMap) {
667
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
668
  while (1) {
669
    px = taosHashIterate(pTableMap, px);
670 671 672 673
    if (px == NULL) {
      break;
    }

674
    // reset the index in last block when handing a new file
675 676
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
677
  }
678 679
}

680
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
681 682 683 684 685 686
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
687

dengyihao's avatar
dengyihao 已提交
688
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
689
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
690

H
Haojun Liao 已提交
691 692
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
693

694
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
695
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
696

697
    sizeInDisk += pScanInfo->mapData.nData;
698
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
699 700
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
701

702
      // 1. time range check
703
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
704 705
        continue;
      }
H
Hongze Cheng 已提交
706

707
      // 2. version range check
H
Hongze Cheng 已提交
708
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
709 710
        continue;
      }
711

712 713 714
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts};

H
Haojun Liao 已提交
715
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
716
      if (p == NULL) {
717
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
718 719
        return TSDB_CODE_OUT_OF_MEMORY;
      }
720

721
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
722
    }
H
Hongze Cheng 已提交
723

H
Haojun Liao 已提交
724
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
725 726 727 728
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
729
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
730
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
731

732
  double el = (taosGetTimestampUs() - st) / 1000.0;
733 734 735 736 737
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
738

739
  pReader->cost.numOfBlocks += total;
740
  pReader->cost.headFileLoadTime += el;
741

H
Haojun Liao 已提交
742 743
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
744

745
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
746
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
747
  pDumpInfo->allDumped = true;
748
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
749 750
}

751 752
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
753
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
754
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
755 756 757
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
758
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
759 760 761 762
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
763
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
764
  }
H
Haojun Liao 已提交
765 766
}

767
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
768 769
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
770 771
    return NULL;
  }
772 773 774

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
775 776
}

H
Hongze Cheng 已提交
777
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
778

H
Haojun Liao 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
852
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
853 854 855 856
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
857 858
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
859 860
    while (1) {
      // check can return
H
Hongze Cheng 已提交
861 862 863
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
864 865

      // change start or end position
H
Hongze Cheng 已提交
866
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
867 868
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
869
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
870 871 872 873
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
874
  } else {  // DESC
H
Haojun Liao 已提交
875
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
876 877
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
878 879
    while (1) {
      // check can return
H
Hongze Cheng 已提交
880 881 882
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
883 884

      // change start or end position
H
Hongze Cheng 已提交
885
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
886 887
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
888
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
889 890 891 892 893 894 895 896 897 898
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
899
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
900 901 902 903 904 905

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
906 907
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
908 909 910 911 912
  }

  return endPos;
}

913
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
914
  SReaderStatus*  pStatus = &pReader->status;
915
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
916

917
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
918
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
919
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
920
  SSDataBlock*        pResBlock = pReader->pResBlock;
921
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
922

H
Haojun Liao 已提交
923
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
924
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
925

H
Haojun Liao 已提交
926
  SColVal cv = {0};
927
  int64_t st = taosGetTimestampUs();
928 929
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
930

931 932
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
933 934 935
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
936 937
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
938 939 940
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
941
    }
H
Haojun Liao 已提交
942 943 944
  }

  // time window check
945 946 947 948 949 950 951
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
952
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
953
  if (remain > pReader->capacity) {  // output buffer check
954 955 956
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
957 958
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
959
  int32_t          i = 0;
960 961
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
962
    if (asc) {
H
Haojun Liao 已提交
963
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
964 965 966 967
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
968
    }
H
Haojun Liao 已提交
969

970 971 972
    i += 1;
  }

973 974 975
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
976 977 978
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
979
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
980 981 982
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
983
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
984 985 986 987
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
988
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
989 990 991 992

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
993
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1005
      }
H
Haojun Liao 已提交
1006

1007
      colIndex += 1;
1008
      i += 1;
1009 1010
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
1011
      i += 1;
H
Haojun Liao 已提交
1012
    }
1013 1014
  }

1015
  // fill the mis-matched columns with null value
1016
  while (i < numOfOutputCols) {
1017 1018 1019
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
1020
  }
H
Haojun Liao 已提交
1021

1022
  pResBlock->info.rows = remain;
1023
  pDumpInfo->rowIndex += step * remain;
1024

1025
  // check if current block are all handled
1026
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1027 1028 1029 1030
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1031
  } else {
1032 1033
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1034
  }
H
Haojun Liao 已提交
1035

1036
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1037
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1038

1039
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1040
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1041
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
1042 1043
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1044 1045 1046 1047

  return TSDB_CODE_SUCCESS;
}

1048 1049
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1050 1051
  int64_t st = taosGetTimestampUs();

1052 1053
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1054 1055
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1056 1057 1058 1059
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1060
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1061
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1062
  ASSERT(pBlockInfo != NULL);
1063

H
Hongze Cheng 已提交
1064
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1065
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1066 1067
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1068
                  ", rows:%d, code:%s %s",
1069
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1070 1071 1072
              tstrerror(code), pReader->idStr);
    return code;
  }
1073

1074
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1075

1076
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
1077
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1078 1079
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1080 1081 1082

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1083

H
Haojun Liao 已提交
1084
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1085
}
H
Hongze Cheng 已提交
1086

H
Haojun Liao 已提交
1087 1088 1089
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1090

H
Haojun Liao 已提交
1091 1092 1093 1094
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1095

H
Haojun Liao 已提交
1096 1097
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1098

H
Haojun Liao 已提交
1099 1100
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1101

H
Haojun Liao 已提交
1102
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1103 1104
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1105

H
Haojun Liao 已提交
1106 1107 1108 1109
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1110

H
Haojun Liao 已提交
1111 1112
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1113

H
Haojun Liao 已提交
1114
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1115
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1116
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1117

H
Haojun Liao 已提交
1118
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1119

H
Haojun Liao 已提交
1120 1121
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1122

H
Haojun Liao 已提交
1123 1124 1125 1126 1127 1128 1129
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1130

1131
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1132
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1133

1134 1135 1136
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1137
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1138 1139
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1140
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1141
    if (pScanInfo == NULL) {
1142
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1143 1144 1145
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1146 1147
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1148
  }
1149 1150 1151 1152 1153 1154

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1155
}
H
Hongze Cheng 已提交
1156

1157
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1158
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1159

1160
  SBlockOrderSupporter sup = {0};
1161
  pBlockIter->numOfBlocks = numOfBlocks;
1162
  taosArrayClear(pBlockIter->blockList);
1163
  pBlockIter->pTableMap = pReader->status.pTableMap;
1164

1165 1166
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1167

1168
  int64_t st = taosGetTimestampUs();
1169
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1170 1171 1172
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1173

1174 1175 1176 1177 1178 1179 1180
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1181

1182
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1183 1184 1185
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1186

1187 1188
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1189

1190 1191 1192 1193 1194
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1195

1196
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1197

1198 1199 1200
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1201
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1202 1203 1204 1205 1206
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1207

1208
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1209

1210
  // since there is only one table qualified, blocks are not sorted
1211 1212
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1213 1214
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1215
    }
1216

1217
    int64_t et = taosGetTimestampUs();
1218
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1219
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1220

1221
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1222
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1223
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1224
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1225
  }
H
Haojun Liao 已提交
1226

1227 1228
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1229

1230
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1231

1232 1233 1234 1235 1236
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1237
  }
H
Haojun Liao 已提交
1238

1239 1240 1241 1242
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1243

1244 1245
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1246

1247 1248 1249 1250
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1251

1252 1253
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1254
  }
H
Haojun Liao 已提交
1255

1256
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1257 1258
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1259 1260
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1261

1262
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1263
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1264

1265
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1266
}
H
Hongze Cheng 已提交
1267

H
Haojun Liao 已提交
1268
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1269 1270
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1271
  int32_t step = asc ? 1 : -1;
1272
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1273 1274 1275
    return false;
  }

1276
  pBlockIter->index += step;
H
Haojun Liao 已提交
1277
  doSetCurrentBlock(pBlockIter, idStr);
1278

1279 1280 1281
  return true;
}

1282 1283 1284
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1285
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1286 1287
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1288 1289
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1290
}
H
Hongze Cheng 已提交
1291

1292
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1293
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1294
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1295
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1296
    return false;
1297 1298
  }

H
Haojun Liao 已提交
1299
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1300
    return false;
1301 1302
  }

1303
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1304
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1305 1306
  *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
//  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1307
  return true;
1308 1309 1310 1311 1312
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1313
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1314 1315
  int32_t index = pBlockIter->index;

1316
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1329
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1330
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1331 1332 1333 1334
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1335 1336 1337 1338 1339
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1340

1341 1342 1343
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1344

H
Haojun Liao 已提交
1345
  doSetCurrentBlock(pBlockIter, "");
1346 1347 1348
  return TSDB_CODE_SUCCESS;
}

1349
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1350 1351
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1352
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1353
  } else {
1354
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1355
  }
H
Haojun Liao 已提交
1356
}
H
Hongze Cheng 已提交
1357

H
Hongze Cheng 已提交
1358
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1359
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1360

1361
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1362
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1363
}
H
Hongze Cheng 已提交
1364

H
Hongze Cheng 已提交
1365
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1366 1367
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1368 1369
}

1370
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t startIndex) {
1371 1372
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1373
  for (int32_t i = startIndex; i < num; i += 1) {
1374 1375
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1376
      if (p->version >= pBlock->minVer) {
1377 1378 1379
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1380
      if (p->version >= pBlock->minVer) {
1381 1382
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1383 1384
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1398
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1399 1400 1401 1402
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1403
  // ts is not overlap
1404
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1405
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1406 1407 1408 1409 1410
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1411
  if (ASCENDING_TRAVERSE(order)) {
1412
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1413 1414
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1415
    while (1) {
1416 1417 1418 1419 1420
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1421 1422 1423
      }
    }

1424
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1425
  }
1426 1427
}

H
Haojun Liao 已提交
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1441 1442
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1443

1444
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1445

1446
  // overlap with neighbor
1447
  if (hasNeighbor) {
1448
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1449 1450
  }

1451
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1452 1453
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1454

H
Haojun Liao 已提交
1455 1456 1457
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1458 1459
  }

H
Haojun Liao 已提交
1460 1461 1462 1463
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1464

H
Haojun Liao 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1479 1480 1481 1482

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1483 1484
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1485 1486 1487
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1488 1489 1490
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1491 1492
}

H
Haojun Liao 已提交
1493 1494 1495 1496 1497 1498 1499 1500 1501
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1502
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1503
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1504 1505
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1506

1507 1508 1509
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1510
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1511

1512
  blockDataUpdateTsWindow(pBlock, 0);
1513
  pBlock->info.uid = pBlockScanInfo->uid;
1514

1515
  setComposedBlockFlag(pReader, true);
1516

1517
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1518
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1519
                " - %" PRId64 " %s",
1520 1521
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1522 1523

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1524 1525 1526
  return code;
}

1527 1528
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1529 1530 1531 1532 1533
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1534
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1535 1536

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1537
    if (nextKey != key) {  // merge is not needed
1538
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1539 1540 1541 1542 1543 1544 1545 1546
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1547 1548
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1549 1550 1551 1552 1553 1554 1555 1556
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1557 1558
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1559 1560 1561 1562 1563 1564 1565
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1566
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1581 1582 1583
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1584
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1585 1586
  }

1587
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1588 1589 1590 1591 1592 1593
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1594 1595 1596 1597 1598 1599
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1600 1601 1602 1603 1604 1605
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1606
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1607
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1608
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1609 1610
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1611 1612
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1613
  }
H
Haojun Liao 已提交
1614 1615
}

1616
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1617 1618 1619 1620 1621 1622
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1623
  int64_t tsLast = INT64_MIN;
1624
  if (hasDataInLastBlock(pLastBlockReader)) {
1625 1626
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1627

H
Hongze Cheng 已提交
1628 1629
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1630

1631 1632
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1633
    minKey = INT64_MAX;  // chosen the minimum value
1634
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1635 1636
      minKey = tsLast;
    }
1637

1638 1639 1640
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1641

1642
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1643 1644 1645 1646
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1647
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1648 1649 1650 1651 1652 1653 1654
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1655
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1656 1657
      minKey = key;
    }
1658 1659 1660 1661
  }

  bool init = false;

1662
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1663
  // DESC: mem -----> imem -----> last block -----> file block
1664 1665
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1666
      init = true;
H
Haojun Liao 已提交
1667 1668 1669 1670
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1671
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1672 1673
    }

1674
    if (minKey == tsLast) {
1675
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1676 1677 1678
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1679
        init = true;
H
Haojun Liao 已提交
1680 1681 1682 1683
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1684
      }
1685
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1686
    }
1687

1688
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1689 1690 1691
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1692 1693
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1694 1695 1696 1697 1698 1699 1700 1701
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1702 1703 1704 1705 1706
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1707
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1708 1709 1710 1711 1712 1713
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1714
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1715 1716
        return code;
      }
1717 1718
    }

1719
    if (minKey == tsLast) {
1720
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1721 1722 1723
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1724
        init = true;
H
Haojun Liao 已提交
1725 1726 1727 1728
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1729
      }
1730
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1731 1732 1733
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1734 1735 1736
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1737
        init = true;
H
Haojun Liao 已提交
1738 1739 1740 1741
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1742 1743 1744
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1745 1746
  }

1747 1748 1749 1750 1751
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1752 1753 1754 1755 1756 1757 1758
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1759 1760 1761
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1762
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1763
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1764 1765 1766

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1767
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1768
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1769

1770 1771 1772 1773 1774
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1775 1776 1777 1778
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1779 1780 1781

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1782
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1783

H
Haojun Liao 已提交
1784
      code = tRowMergerGetRow(&merge, &pTSRow);
1785 1786 1787
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1788

1789 1790 1791 1792 1793 1794
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1795 1796 1797 1798 1799
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1800
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1801
    ASSERT(mergeBlockData);
1802 1803

    // merge with block data if ts == key
H
Haojun Liao 已提交
1804
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1805 1806 1807
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1808
    code = tRowMergerGetRow(&merge, &pTSRow);
1809 1810 1811 1812 1813 1814 1815 1816 1817
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1818 1819 1820 1821

  return TSDB_CODE_SUCCESS;
}

1822 1823
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1824 1825
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1826
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1827
    // no last block available, only data block exists
1828
    if (!hasDataInLastBlock(pLastBlockReader)) {
1829 1830 1831 1832 1833 1834 1835 1836 1837
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1838
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1839 1840 1841 1842
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1843

H
Haojun Liao 已提交
1844 1845 1846 1847 1848
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1849
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1850 1851 1852 1853

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1854
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1855

H
Haojun Liao 已提交
1856
        code = tRowMergerGetRow(&merge, &pTSRow);
1857 1858 1859 1860
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1861
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1862

1863 1864
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1865
        return code;
1866
      } else {
1867 1868
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1869
      }
1870
    } else {  // desc order
1871
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1872
    }
1873
  } else {  // only last block exists
1874
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1875
  }
1876 1877
}

1878 1879
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1880 1881 1882
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1883 1884 1885
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1886 1887
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1888 1889
  ASSERT(pRow != NULL && piRow != NULL);

1890
  int64_t tsLast = INT64_MIN;
1891 1892 1893
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1894

H
Hongze Cheng 已提交
1895
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1896 1897 1898 1899

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1900
  int64_t minKey = 0;
1901 1902 1903 1904 1905
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1906

1907 1908 1909
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1910

1911
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1912 1913
      minKey = key;
    }
1914

1915
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1916 1917 1918
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1919
    minKey = INT64_MIN;  // let find the maximum ts value
1920 1921 1922 1923 1924 1925 1926 1927
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1928
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1929 1930 1931
      minKey = key;
    }

1932
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1933 1934
      minKey = tsLast;
    }
1935 1936 1937 1938
  }

  bool init = false;

1939 1940 1941 1942
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1943
      init = true;
1944
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1945
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1946 1947 1948 1949
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1950
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1951 1952
    }

1953
    if (minKey == tsLast) {
1954
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1955 1956 1957
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1958
        init = true;
1959
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1960 1961 1962
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1963
      }
H
Haojun Liao 已提交
1964

1965
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1966 1967 1968
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1969 1970 1971
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1972 1973
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1974 1975 1976 1977
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1978
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1979 1980 1981
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1982
      }
H
Haojun Liao 已提交
1983

1984 1985
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1986 1987 1988
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1989 1990
    }

1991
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1992
      if (init) {
1993 1994 1995 1996
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1997 1998
        tRowMerge(&merge, pRow);
      } else {
1999
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2000
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2001 2002 2003
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2004
      }
H
Haojun Liao 已提交
2005 2006 2007 2008 2009
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2010 2011 2012 2013
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2014
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2015
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2016 2017 2018 2019
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2020 2021 2022 2023 2024
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2025 2026 2027
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2028 2029 2030
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2031 2032
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2033
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2034 2035 2036
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2037
      }
H
Haojun Liao 已提交
2038 2039 2040 2041 2042
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2043 2044 2045
    }

    if (minKey == tsLast) {
2046
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2047 2048 2049
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2050
        init = true;
2051
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2052 2053 2054
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2055
      }
2056
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2057 2058 2059
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2060
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2061
      if (!init) {
2062
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2063 2064 2065
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2066
      } else {
2067 2068 2069
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2070
        tRowMerge(&merge, &fRow);
2071 2072
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2073 2074 2075
    }
  }

2076
  if (merge.pTSchema == NULL) {
2077 2078 2079
    return code;
  }

H
Haojun Liao 已提交
2080
  code = tRowMergerGetRow(&merge, &pTSRow);
2081 2082 2083 2084
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2085 2086 2087 2088
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2089
  return code;
2090 2091
}

2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2117
                  "-%" PRId64 " %s",
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2138
                  "-%" PRId64 " %s",
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2156 2157
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2158 2159 2160 2161 2162 2163 2164 2165
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2177
  TSDBKEY k = {.ts = ts, .version = ver};
2178 2179
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2180 2181 2182
    return false;
  }

2183 2184 2185
  return true;
}

2186
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2187
  // the last block reader has been initialized for this table.
2188
  if (pLBlockReader->uid == pScanInfo->uid) {
2189 2190 2191
    return true;
  }

2192 2193
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2194 2195
  }

2196 2197
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2198

H
Hongze Cheng 已提交
2199
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2200 2201 2202
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2203
  } else {
2204
    w.ekey = pScanInfo->lastKey + step;
2205 2206
  }

2207 2208 2209
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2210 2211 2212 2213
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2214
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2215 2216
}

2217
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2218
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2219
  return TSDBROW_TS(&row);
2220 2221
}

H
Hongze Cheng 已提交
2222
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2223 2224 2225 2226

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2227 2228
  }

2229
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2230
}
2231

2232 2233
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2234 2235 2236 2237
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2238 2239
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2240 2241 2242
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2243 2244 2245 2246 2247
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2248
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2249
    code = tRowMergerGetRow(&merge, &pTSRow);
2250 2251 2252 2253
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2254 2255 2256 2257 2258 2259 2260 2261
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2262 2263
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2264 2265
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2266
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2267
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2268
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2269
  } else {
2270 2271 2272 2273 2274 2275 2276 2277 2278
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2279
    // imem + file + last block
2280
    if (pBlockScanInfo->iiter.hasVal) {
2281
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2282 2283
    }

2284
    // mem + file + last block
2285
    if (pBlockScanInfo->iter.hasVal) {
2286
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2287
    }
2288

2289 2290
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2291 2292 2293
  }
}

2294
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2295 2296
  int32_t code = TSDB_CODE_SUCCESS;

2297 2298
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2299
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2300 2301 2302
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2303 2304 2305

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2306
    pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2307 2308
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2309 2310
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2311 2312 2313
      goto _end;
    }

H
Haojun Liao 已提交
2314
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2315
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2316 2317 2318

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2319 2320
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2321 2322 2323
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2324 2325
    }
  } else {  // file blocks not exist
2326
    pBlockScanInfo = *pReader->status.pTableIter;
2327 2328
  }

2329
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2330 2331
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2332

2333
  while (1) {
2334
    bool hasBlockData = false;
2335
    {
H
Haojun Liao 已提交
2336
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2337 2338 2339 2340 2341
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2342 2343
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2344
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2345
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2346
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2347 2348 2349
          break;
        }
      }
2350
    }
2351

2352
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2353

2354 2355 2356
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2357 2358
    }

2359
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2360

2361
    // currently loaded file data block is consumed
2362
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2363
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2364
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2365 2366 2367 2368 2369
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2370 2371 2372
    }
  }

H
Haojun Liao 已提交
2373
_end:
2374
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2375 2376
  blockDataUpdateTsWindow(pResBlock, 0);

2377
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2378
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2379 2380 2381

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2382

2383 2384
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2385
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2386
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2387
              pResBlock->info.rows, el, pReader->idStr);
2388
  }
2389

H
Haojun Liao 已提交
2390
  return code;
2391 2392 2393 2394
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2395 2396
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2397 2398 2399
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2400

2401 2402 2403
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2404 2405
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2406
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2407 2408
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2409
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2410
    if (code != TSDB_CODE_SUCCESS) {
2411 2412 2413 2414 2415
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2416
      tsdbDelFReaderClose(&pDelFReader);
2417 2418 2419
      goto _err;
    }

H
Hongze Cheng 已提交
2420
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2421 2422 2423
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2424 2425
      goto _err;
    }
2426

2427 2428 2429
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2430
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2431
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2432 2433 2434 2435 2436 2437 2438
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2439
    }
2440
  }
2441

2442 2443 2444 2445 2446 2447 2448
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2449 2450
  }

2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2465
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2466
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2467 2468
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2469
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2470 2471
  return code;

H
Haojun Liao 已提交
2472
_err:
2473 2474
  taosArrayDestroy(pDelData);
  return code;
2475 2476
}

H
Haojun Liao 已提交
2477
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2478
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2479
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2480
  if (pRow != NULL) {
2481 2482 2483
    key = TSDBROW_KEY(pRow);
  }

2484
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2485
  if (pRow != NULL) {
2486 2487 2488 2489 2490 2491 2492 2493 2494
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2495
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2496
  SReaderStatus* pStatus = &pReader->status;
2497
  pBlockNum->numOfBlocks = 0;
2498
  pBlockNum->numOfLastFiles = 0;
2499

2500
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2501
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2502 2503

  while (1) {
2504
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2505
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2506 2507 2508
      break;
    }

H
Haojun Liao 已提交
2509
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2510 2511
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2512
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2513 2514 2515
      return code;
    }

H
Hongze Cheng 已提交
2516
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2517
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2518
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2519
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2520 2521 2522
        return code;
      }

2523
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2524 2525 2526
        break;
      }
    }
2527

H
Haojun Liao 已提交
2528 2529 2530
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2531
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2532 2533 2534
  return TSDB_CODE_SUCCESS;
}

2535
static int32_t uidComparFunc(const void* p1, const void* p2) {
2536 2537
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2538 2539 2540
  if (pu1 == pu2) {
    return 0;
  } else {
2541
    return (pu1 < pu2) ? -1 : 1;
2542 2543
  }
}
2544

2545
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2546 2547 2548 2549
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2550
  while (p != NULL) {
2551
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
2552 2553 2554 2555 2556 2557 2558
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2559
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2560 2561 2562 2563
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2564

2565
  if (pOrderCheckInfo->tableUidList == NULL) {
2566 2567 2568 2569 2570 2571
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2572
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2573 2574 2575
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2576 2577
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2578 2579
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2580 2581

      // the tableMap has already updated
2582
      if (pStatus->pTableIter == NULL) {
2583
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2584 2585 2586 2587 2588 2589 2590 2591 2592
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2593
      }
2594
    }
2595
  }
2596

2597 2598 2599
  return TSDB_CODE_SUCCESS;
}

2600
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2613
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2614
  SReaderStatus*    pStatus = &pReader->status;
2615 2616
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2617 2618
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2619
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2620 2621
    return code;
  }
2622

2623
  while (1) {
2624
    // load the last data block of current table
2625
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
H
Hongze Cheng 已提交
2626
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2627
    if (!hasVal) {
2628 2629
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2630 2631 2632
        return TSDB_CODE_SUCCESS;
      }
      continue;
2633 2634
    }

2635 2636 2637 2638 2639 2640 2641 2642
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2643

2644
    // current table is exhausted, let's try next table
2645 2646
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2647 2648
      return TSDB_CODE_SUCCESS;
    }
2649 2650 2651
  }
}

2652
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2653 2654
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2655 2656 2657

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2658 2659 2660
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2661

2662
  if (pBlockInfo != NULL) {
2663
    pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2664
  } else {
2665
    pScanInfo = *pReader->status.pTableIter;
2666 2667
  }

H
Haojun Liao 已提交
2668
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2669
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2670 2671 2672 2673
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2674
  if (pBlockInfo != NULL) {
2675
    pBlock = getCurrentBlock(pBlockIter);
2676 2677
  }

2678
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2679
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2680

2681 2682 2683
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2684
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2685
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2686 2687
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2688 2689 2690
    }

    // build composed data block
2691
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2692
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2693
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2694
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2695
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2696
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2697 2698 2699 2700
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2701
      ASSERT(tsLast >= pBlock->maxKey.ts);
2702 2703
      tBlockDataReset(&pReader->status.fileBlockData);

2704
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2705
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2706
    } else {  // whole block is required, return it directly
2707 2708 2709 2710 2711 2712 2713
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2714 2715 2716 2717 2718
  }

  return code;
}

H
Haojun Liao 已提交
2719
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2720 2721
  SReaderStatus* pStatus = &pReader->status;

2722
  while (1) {
2723 2724 2725
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2726
        return TSDB_CODE_SUCCESS;
2727 2728 2729
      }
    }

2730 2731
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2732

2733
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2734
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2735 2736 2737 2738
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2739
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2740
      return TSDB_CODE_SUCCESS;
2741 2742 2743 2744 2745
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2746
      return TSDB_CODE_SUCCESS;
2747 2748 2749 2750
    }
  }
}

2751
// set the correct start position in case of the first/last file block, according to the query time window
2752
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2753
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2754

2755 2756 2757
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2758 2759 2760

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2761
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2762 2763
}

2764
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2765 2766
  SBlockNumber num = {0};

2767
  int32_t code = moveToNextFile(pReader, &num);
2768 2769 2770 2771 2772
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2773
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2774 2775 2776 2777 2778
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2779 2780
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2781
  } else {  // no block data, only last block exists
2782
    tBlockDataReset(&pReader->status.fileBlockData);
2783
    resetDataBlockIterator(pBlockIter, pReader->order);
2784
  }
2785 2786

  // set the correct start position according to the query time window
2787
  initBlockDumpInfo(pReader, pBlockIter);
2788 2789 2790
  return code;
}

2791
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2792 2793
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2794 2795
}

2796
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2797
  int32_t code = TSDB_CODE_SUCCESS;
2798 2799
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2800 2801
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2802
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2803
  _begin:
2804 2805 2806 2807 2808
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2809 2810 2811 2812
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2813
    // all data blocks are checked in this last block file, now let's try the next file
2814 2815 2816 2817 2818 2819 2820 2821
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2822
      // this file does not have data files, let's start check the last block file if exists
2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2838
  while (1) {
2839 2840
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2841
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2842
      code = buildComposedDataBlock(pReader);
2843 2844 2845 2846
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2847
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2848 2849
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2850
        } else {
H
Haojun Liao 已提交
2851
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2852 2853 2854 2855 2856 2857
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2858

2859 2860 2861 2862
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2863

2864 2865 2866 2867
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2868
          }
2869
        }
H
Haojun Liao 已提交
2870
      }
2871 2872

      code = doBuildDataBlock(pReader);
2873 2874
    }

2875 2876 2877 2878 2879 2880 2881 2882
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2883
}
H
refact  
Hongze Cheng 已提交
2884

2885 2886
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2887
  if (VND_IS_RSMA(pVnode)) {
2888
    int8_t  level = 0;
2889 2890
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2891
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2892 2893
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2894

2895
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2896 2897 2898 2899 2900 2901 2902
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2903
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2904 2905 2906 2907 2908
        break;
      }
      ++level;
    }

2909
    const char* str = (idStr != NULL) ? idStr : "";
2910 2911

    if (level == TSDB_RETENTION_L0) {
2912
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2913
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2914 2915
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2916
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2917
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2918 2919
      return VND_RSMA1(pVnode);
    } else {
2920
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2921
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2922 2923 2924 2925 2926 2927 2928
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2929
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2930
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2931 2932

  int64_t endVer = 0;
L
Liu Jicong 已提交
2933 2934
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2935 2936
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2937
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2938 2939
  }

H
Haojun Liao 已提交
2940
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2941 2942
}

2943
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2944 2945 2946 2947
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2948 2949 2950
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2951

2952 2953 2954 2955 2956 2957
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2958
        return false;
2959 2960 2961
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2962 2963
      }
    } else {
2964 2965 2966 2967 2968 2969 2970
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2971 2972
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2988 2989
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2990 2991 2992 2993 2994 2995
            return true;
          }
        }
      }

      return false;
2996 2997
    }
  } else {
2998 2999
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3000

3001 3002 3003 3004 3005 3006 3007
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3008
    } else {
3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3036 3037 3038 3039 3040
      }

      return false;
    }
  }
3041 3042

  return false;
3043 3044
}

3045
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3046
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3047 3048
    return NULL;
  }
H
Hongze Cheng 已提交
3049

3050
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3051
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3052
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3053
    pIter->hasVal = false;
H
Haojun Liao 已提交
3054 3055
    return NULL;
  }
H
Hongze Cheng 已提交
3056

3057
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3058
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3059
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3060 3061
    return pRow;
  }
H
Hongze Cheng 已提交
3062

3063
  while (1) {
3064 3065
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3066 3067
      return NULL;
    }
H
Hongze Cheng 已提交
3068

3069
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3070

H
Haojun Liao 已提交
3071
    key = TSDBROW_KEY(pRow);
3072
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3073
      pIter->hasVal = false;
H
Haojun Liao 已提交
3074 3075
      return NULL;
    }
H
Hongze Cheng 已提交
3076

dengyihao's avatar
dengyihao 已提交
3077
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3078
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3079 3080 3081 3082
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3083

3084 3085
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3086
  while (1) {
3087 3088
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3089 3090
      break;
    }
H
Hongze Cheng 已提交
3091

3092
    // data exists but not valid
3093
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3094 3095 3096 3097 3098
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3099
    TSDBKEY k = TSDBROW_KEY(pRow);
3100
    if (k.ts != ts) {
H
Haojun Liao 已提交
3101 3102 3103
      break;
    }

H
Haojun Liao 已提交
3104
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3105 3106 3107 3108
    if (pTSchema == NULL) {
      return terrno;
    }

3109
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3110 3111 3112 3113 3114
  }

  return TSDB_CODE_SUCCESS;
}

3115
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3116
                                          SVersionRange* pVerRange, int32_t step) {
3117 3118
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3119
      rowIndex += step;
3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3136
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3137 3138
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3139
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3140
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3141

3142
  *state = CHECK_FILEBLOCK_QUIT;
3143
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3144

3145 3146 3147 3148
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3149
  if (!hasNeighbor) {  // do nothing
3150 3151 3152
    return 0;
  }

3153
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3154
  if (overlap) {  // load next block
3155
    SReaderStatus*  pStatus = &pReader->status;
3156 3157
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3158
    // 1. find the next neighbor block in the scan block list
3159
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3160
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3161

3162
    // 2. remove it from the scan block list
3163
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3164

3165
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3166
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3167 3168 3169 3170
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3171
    // 4. check the data values
3172 3173 3174 3175
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3176
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3177 3178 3179 3180 3181 3182 3183
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3184 3185
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3186 3187
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3188
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3189
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3190
  int32_t step = asc ? 1 : -1;
3191

3192
  pDumpInfo->rowIndex += step;
3193
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3194 3195 3196
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3197

3198 3199 3200 3201
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3202

3203
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3204
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3205 3206 3207
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3208
      }
3209
    }
H
Haojun Liao 已提交
3210
  }
3211

H
Haojun Liao 已提交
3212 3213 3214
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3215
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3216
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3217
  pScanInfo->lastKey = ts;
3218
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3219 3220
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3221
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3222 3223 3224 3225 3226 3227 3228 3229 3230
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3231 3232
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3233
  TSDBROW* pNextRow = NULL;
3234
  TSDBROW  current = *pRow;
3235

3236 3237
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3238

3239 3240 3241
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3242
      return TSDB_CODE_SUCCESS;
3243
    } else {  // has next point in mem/imem
3244
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3245 3246 3247
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3248
        return TSDB_CODE_SUCCESS;
3249 3250
      }

H
Haojun Liao 已提交
3251
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3252 3253
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3254
        return TSDB_CODE_SUCCESS;
3255
      }
3256
    }
3257 3258
  }

3259 3260
  SRowMerger merge = {0};

3261
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3262
  terrno = 0;
H
Haojun Liao 已提交
3263
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3264 3265 3266
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3267

3268 3269
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3270
  }
H
Haojun Liao 已提交
3271

H
Haojun Liao 已提交
3272 3273 3274 3275
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3276 3277

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3278
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3279 3280 3281
    return terrno;
  }

H
Haojun Liao 已提交
3282 3283
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3284 3285 3286 3287 3288
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3289
  code = tRowMergerGetRow(&merge, pTSRow);
3290 3291 3292
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3293

3294
  tRowMergerClear(&merge);
3295
  *freeTSRow = true;
3296
  return TSDB_CODE_SUCCESS;
3297 3298
}

3299
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3300
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3301 3302
  SRowMerger merge = {0};

3303 3304 3305
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3306
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3307
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3308

H
Haojun Liao 已提交
3309 3310 3311 3312 3313 3314 3315 3316 3317 3318
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3319

3320
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3321 3322 3323 3324 3325 3326
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3327
  } else {
H
Haojun Liao 已提交
3328
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3329

H
Haojun Liao 已提交
3330
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3331
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3332 3333 3334 3335 3336 3337 3338 3339
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3340 3341

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3342 3343 3344 3345 3346
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3347
  }
3348

3349 3350
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3351 3352
}

3353 3354
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3355 3356
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3357
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3358
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3359

3360 3361
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3362
  if (pBlockScanInfo->iter.hasVal) {
3363 3364 3365 3366 3367 3368
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3369
  if (pBlockScanInfo->iiter.hasVal) {
3370 3371 3372 3373 3374 3375
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3376
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3377
    TSDBKEY k = TSDBROW_KEY(pRow);
3378
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3379

3380
    int32_t code = TSDB_CODE_SUCCESS;
3381 3382
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3383
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3384
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3385
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3386
      }
3387
    } else {  // ik.ts == k.ts
3388
      *freeTSRow = true;
3389 3390 3391 3392
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3393
    }
3394

3395
    return code;
H
Haojun Liao 已提交
3396 3397
  }

3398
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3399 3400
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3401 3402
  }

3403
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3404
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3405 3406 3407 3408 3409
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3410
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3411 3412 3413
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3414
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3415
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3416

3417
  SColVal colVal = {0};
3418
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3419

3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3431
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3432 3433 3434 3435 3436 3437 3438 3439
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3440
    }
3441 3442
  }

3443
  // set null value since current column does not exist in the "pSchema"
3444
  while (i < numOfCols) {
3445 3446 3447 3448 3449
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3450 3451 3452 3453
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3454 3455
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3456 3457 3458 3459 3460 3461 3462 3463
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3464
    i += 1;
3465 3466 3467
  }

  SColVal cv = {0};
3468 3469
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3470

3471
  while (i < numOfOutputCols && j < numOfInputCols) {
3472
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3473
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3474

3475 3476 3477 3478 3479
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3480
    if (pData->cid == pCol->info.colId) {
3481 3482
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3483
      j += 1;
H
Haojun Liao 已提交
3484 3485
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3486 3487 3488 3489 3490 3491 3492 3493
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3494
    colDataAppendNULL(pCol, outputRowIndex);
3495 3496 3497 3498 3499 3500 3501
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3502 3503
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3504 3505 3506 3507
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3508
    bool    freeTSRow = false;
3509
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3510 3511
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3512 3513
    }

H
Haojun Liao 已提交
3514
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3515 3516 3517
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3518 3519

    // no data in buffer, return immediately
3520
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3521 3522 3523
      break;
    }

3524
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3525 3526 3527 3528
      break;
    }
  } while (1);

3529
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3530 3531
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3532

3533 3534
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3535
  ASSERT(pReader != NULL);
3536
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3537

3538
  STableBlockScanInfo** p = NULL;
3539
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3540
    clearBlockScanInfo(*p);
3541 3542
  }

3543 3544 3545
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3546 3547
  taosHashClear(pReader->status.pTableMap);

3548 3549
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
3550 3551 3552
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3553 3554
  }

H
Hongze Cheng 已提交
3555 3556 3557
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3558 3559 3560 3561 3562 3563
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3564

dengyihao's avatar
dengyihao 已提交
3565 3566 3567 3568 3569 3570
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3571

H
Hongze Cheng 已提交
3572
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3573

3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3589
// ====================================== EXPOSED APIs ======================================
3590 3591
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3592 3593 3594 3595 3596 3597
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3598 3599
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3600 3601
    goto _err;
  }
H
Hongze Cheng 已提交
3602

3603
  // check for query time window
H
Haojun Liao 已提交
3604
  STsdbReader* pReader = *ppReader;
3605
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3606 3607 3608
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3609

3610 3611
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3612
    int32_t order = pCond->order;
3613
    if (order == TSDB_ORDER_ASC) {
3614
      pCond->twindows.ekey = window.skey;
3615 3616 3617
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3618
      pCond->twindows.skey = window.ekey;
3619 3620 3621 3622
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3623
    // here we only need one more row, so the capacity is set to be ONE.
3624 3625 3626 3627 3628 3629
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3630
      pCond->twindows.skey = window.ekey;
3631
      pCond->twindows.ekey = INT64_MAX;
3632
    } else {
3633
      pCond->twindows.skey = INT64_MIN;
3634
      pCond->twindows.ekey = window.ekey;
3635
    }
3636 3637
    pCond->order = order;

3638 3639 3640 3641 3642 3643
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3644
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3645
  if (pCond->suid != 0) {
3646
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3647
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3648
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3649
    }
3650 3651
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3652
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3653
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3654
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3655
    }
3656 3657
  }

H
Hongze Cheng 已提交
3658
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3659

3660
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3661 3662 3663
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3664

H
Haojun Liao 已提交
3665 3666 3667
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3668

3669
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3670
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3671 3672 3673
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3674

3675
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3676 3677 3678
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3679
      }
3680
    } else {
H
Haojun Liao 已提交
3681 3682
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3683

H
Haojun Liao 已提交
3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3696

H
Haojun Liao 已提交
3697
      code = doOpenReaderImpl(pPrevReader);
3698
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3699
        return code;
3700
      }
3701 3702 3703
    }
  }

3704
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3705
  return code;
H
Hongze Cheng 已提交
3706

3707
  _err:
H
Haojun Liao 已提交
3708
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3709
  return code;
H
refact  
Hongze Cheng 已提交
3710 3711 3712
}

void tsdbReaderClose(STsdbReader* pReader) {
3713 3714
  if (pReader == NULL) {
    return;
3715
  }
H
refact  
Hongze Cheng 已提交
3716

3717 3718
  {
    if (pReader->innerReader[0] != NULL) {
3719
      STsdbReader* p = pReader->innerReader[0];
3720

3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3732 3733 3734 3735 3736 3737

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3738
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3739

3740 3741 3742 3743
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3744
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3745 3746 3747 3748
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3749

3750
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3751
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3752 3753

  cleanupDataBlockIterator(&pReader->status.blockIter);
3754 3755

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3756
  destroyAllBlockScanInfo(pReader->status.pTableMap);
3757
  blockDataDestroy(pReader->pResBlock);
3758
  clearBlockScanInfoBuf(&pReader->blockInfoBuf);
3759

H
Haojun Liao 已提交
3760 3761 3762
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3763

H
Haojun Liao 已提交
3764
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3765

3766
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3767
  SIOCostSummary* pCost = &pReader->cost;
3768

H
Haojun Liao 已提交
3769 3770
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3771 3772
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3773

H
Haojun Liao 已提交
3774 3775 3776 3777 3778
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3779

H
Haojun Liao 已提交
3780 3781 3782 3783 3784 3785 3786 3787 3788 3789
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3790

3791 3792
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3793 3794 3795
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3796
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3797 3798
}

3799
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3800
  // cleanup the data that belongs to the previous data block
3801 3802
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3803

3804
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3805

3806 3807 3808 3809 3810
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3811

3812 3813 3814
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3815
      buildBlockFromBufferSequentially(pReader);
3816
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3817
    }
3818 3819 3820
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3821
  }
3822

3823
  return false;
H
refact  
Hongze Cheng 已提交
3824 3825
}

3826 3827 3828 3829 3830
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3831
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3832
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3833
    pReader->step = EXTERNAL_ROWS_PREV;
3834 3835 3836
    if (ret) {
      return ret;
    }
3837
  }
3838

3839
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3840 3841
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3842
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3843 3844 3845 3846 3847

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3848
    pReader->step = EXTERNAL_ROWS_MAIN;
3849 3850 3851 3852 3853 3854 3855
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3856
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3857 3858
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3859
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3860 3861 3862 3863
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3864
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3865
    pReader->step = EXTERNAL_ROWS_NEXT;
3866 3867 3868 3869 3870 3871 3872 3873
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3874
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
3875
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3876
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3877 3878 3879
    return false;
  }

H
Haojun Liao 已提交
3880
  return true;
3881 3882
}

H
Haojun Liao 已提交
3883 3884 3885 3886 3887
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3888 3889
}

H
Haojun Liao 已提交
3890
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3891
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3892
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3893
      setBlockInfo(pReader, rows, uid, pWindow);
3894
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3895
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3896
    } else {
H
Haojun Liao 已提交
3897
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3898 3899
    }
  } else {
H
Haojun Liao 已提交
3900
    setBlockInfo(pReader, rows, uid, pWindow);
3901 3902 3903
  }
}

3904
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3905
  int32_t code = 0;
3906
  *allHave = false;
H
Hongze Cheng 已提交
3907

3908
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3909 3910 3911 3912
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3913
  // there is no statistics data for composed block
3914 3915 3916 3917
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3918

3919
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3920

H
Hongze Cheng 已提交
3921
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
3922
//  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3923

3924 3925
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3926
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3927
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3928
    if (code != TSDB_CODE_SUCCESS) {
3929 3930
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3931 3932
      return code;
    }
3933 3934 3935
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3936
  }
H
Hongze Cheng 已提交
3937

3938
  *allHave = true;
H
Hongze Cheng 已提交
3939

3940 3941
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3942

3943 3944
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3945 3946 3947 3948 3949 3950 3951 3952
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
3953 3954 3955
  size_t size = taosArrayGetSize(pSup->pColAgg);

  while (j < numOfCols && i < size) {
3956 3957 3958 3959 3960 3961 3962
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3963 3964
      i += 1;
      j += 1;
3965 3966 3967 3968 3969 3970 3971
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3972
  pReader->cost.smaDataLoad += 1;
3973 3974
  *pBlockStatis = pSup->plist;

3975
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
3976

H
Hongze Cheng 已提交
3977
  return code;
H
Hongze Cheng 已提交
3978 3979
}

3980
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3981 3982 3983
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3984
    return pReader->pResBlock->pDataBlock;
3985
  }
3986

H
Haojun Liao 已提交
3987
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
3988
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
3989 3990 3991 3992 3993 3994
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3995

3996
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3997
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3998
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3999 4000
    terrno = code;
    return NULL;
4001
  }
4002 4003 4004

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4005 4006
}

4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4019
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4020
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4021 4022
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4023

4024 4025
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4026
  pReader->order = pCond->order;
4027
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4028
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4029
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4030
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4031

4032
  // allocate buffer in order to load data blocks from file
4033
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4034 4035
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4036
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4037
  tsdbDataFReaderClose(&pReader->pFileReader);
4038

4039
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4040

4041
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4042
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4043

H
Hongze Cheng 已提交
4044
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4045
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4046

4047
  int32_t code = 0;
4048

4049 4050 4051 4052 4053 4054
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4055 4056
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4057 4058 4059
      return code;
    }
  }
H
Hongze Cheng 已提交
4060

H
Hongze Cheng 已提交
4061
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
4062
                " in query %s",
H
Hongze Cheng 已提交
4063 4064
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4065

4066
  return code;
H
Hongze Cheng 已提交
4067
}
H
Hongze Cheng 已提交
4068

4069 4070 4071
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4072

4073 4074 4075 4076
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4077

4078 4079
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4080

4081 4082 4083
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4084

4085
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4086

4087
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4088

4089 4090
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4091

4092 4093
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4094

4095 4096
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4097
  }
H
Hongze Cheng 已提交
4098

4099
  pTableBlockInfo->numOfTables = numOfTables;
4100
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4101

4102 4103
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4104
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4105

4106 4107
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4108

4109 4110 4111
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4112

4113 4114 4115
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4116

4117 4118 4119
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4120

4121 4122
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4123

H
Haojun Liao 已提交
4124
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4125 4126 4127 4128 4129
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4130

4131 4132
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4133
    }
H
refact  
Hongze Cheng 已提交
4134

H
Hongze Cheng 已提交
4135 4136
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4137
  }
H
Hongze Cheng 已提交
4138

H
refact  
Hongze Cheng 已提交
4139 4140
  return code;
}
H
Hongze Cheng 已提交
4141

H
refact  
Hongze Cheng 已提交
4142
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4143
  int64_t rows = 0;
H
Hongze Cheng 已提交
4144

4145 4146
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4147

4148
  while (pStatus->pTableIter != NULL) {
4149
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4150 4151 4152

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4153
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4154 4155 4156 4157 4158 4159 4160
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4161
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4162 4163 4164 4165 4166 4167 4168 4169
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4170

H
refact  
Hongze Cheng 已提交
4171
  return rows;
H
Hongze Cheng 已提交
4172
}
D
dapan1121 已提交
4173

L
Liu Jicong 已提交
4174
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4187

D
dapan1121 已提交
4188
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4189
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4204
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4205

D
dapan1121 已提交
4206 4207
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4208

H
Haojun Liao 已提交
4209
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4238
  // fs
H
Hongze Cheng 已提交
4239 4240 4241 4242 4243
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4244 4245 4246 4247 4248 4249 4250 4251

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4252
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4253
  _exit:
H
Hongze Cheng 已提交
4254 4255 4256
  return code;
}

H
Haojun Liao 已提交
4257
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4258 4259 4260 4261 4262 4263 4264 4265 4266
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4267
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4268
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4269
  }
H
Haojun Liao 已提交
4270 4271
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}