tsdbRead.c 141.5 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
  STimeWindow window;
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
87
  SColumnDataAgg** plist;
88
  int16_t*         colIds;  // column ids for loading file block data
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
91 92
} SBlockLoadSuppInfo;

93
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
94 95 96 97 98
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
99
  SSttBlockLoadInfo* pInfo;
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
103 104 105
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
106
  int32_t           order;
H
Hongze Cheng 已提交
107
  SLastBlockReader* pLastBlockReader;  // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
H
Hongze Cheng 已提交
119
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SDataBlk  block;  // current SDataBlk data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131
} SFileBlockDumpInfo;

132
typedef struct SUidOrderCheckInfo {
133 134
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
135 136
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
137
typedef struct SReaderStatus {
138
  bool                 loadFromFile;       // check file stage
139
  bool                 composedDataBlock;  // the returned data block is a composed block or not
140
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
141
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
142
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
143
  SFileBlockDumpInfo   fBlockDumpInfo;
144 145 146 147
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
148 149
} SReaderStatus;

150 151 152 153 154 155
typedef struct SBlockInfoBuf {
  int32_t  currentIndex;
  SArray*  pData;
  int32_t  numPerBucket;
} SBlockInfoBuf;

H
Hongze Cheng 已提交
156
struct STsdbReader {
H
Haojun Liao 已提交
157 158 159 160 161 162 163
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
164 165
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
166
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
167
  STsdbReadSnap*     pReadSnap;
168
  SIOCostSummary     cost;
169 170
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
171 172
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
173 174 175
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
176
};
H
Hongze Cheng 已提交
177

H
Haojun Liao 已提交
178
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
179 180
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
181
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
182 183
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
184
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
185
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
186
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
187
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
188
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
189
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
190
                                         int32_t rowIndex);
191
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
192
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
193

H
Hongze Cheng 已提交
194 195 196 197
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
198 199
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
200

dengyihao's avatar
dengyihao 已提交
201 202 203 204
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
205
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
206 207 208
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
209
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
210 211
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
212

213 214
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

215 216 217
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

218
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
219

220
  pSupInfo->numOfCols = numOfCols;
221
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
222
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
223 224 225
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
226 227
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
228

H
Haojun Liao 已提交
229 230
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
231
    pSupInfo->colIds[i] = pCol->info.colId;
232 233 234 235

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
236
  }
H
Hongze Cheng 已提交
237

H
Haojun Liao 已提交
238 239
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
240

241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  int32_t num =  numOfTables / pBuf->numPerBucket;
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for(int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
  for(int32_t i = 0; i < num; ++i) {
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
  char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
285
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
286
  // allocate buffer in order to load data blocks from file
287
  // todo use simple hash instead, optimize the memory consumption
288 289 290
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
291 292 293
    return NULL;
  }

H
Haojun Liao 已提交
294
  int64_t st = taosGetTimestampUs();
295
  initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
H
Haojun Liao 已提交
296

297
  for (int32_t j = 0; j < numOfTables; ++j) {
298 299 300 301 302 303 304 305 306 307 308 309 310 311
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);

#if 0
//    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
312
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
313
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
314
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
315
    } else {
H
Haojun Liao 已提交
316
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
317
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
318
    }
wmmhello's avatar
wmmhello 已提交
319

320
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
321 322 323
#endif

    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
324
              pTsdbReader->idStr);
H
Haojun Liao 已提交
325 326
  }

H
Haojun Liao 已提交
327 328 329 330
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
331

332
  return pTableMap;
H
Hongze Cheng 已提交
333
}
H
Hongze Cheng 已提交
334

335 336
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
337
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
338 339 340 341 342 343
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;

    pInfo->iterInit = false;
    pInfo->iiter.hasVal = false;
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
344 345
    }

346 347
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
348 349 350
  }
}

351 352 353
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
354

355 356 357
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
358

359 360 361
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
362

363 364 365 366
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
367

368
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
369
  void* p = NULL;
370
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
371
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
372 373 374 375 376
  }

  taosHashCleanup(pTableMap);
}

377
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
378 379
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
380
}
H
Hongze Cheng 已提交
381

382 383 384
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
385
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
386

387
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
388
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
389

dengyihao's avatar
dengyihao 已提交
390
  STimeWindow win = *pWindow;
391 392 393 394 395 396
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
397

H
Haojun Liao 已提交
398
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
399 400 401 402 403 404
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
405 406 407
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
408 409 410 411
  }
}

// init file iterator
412
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
413
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
414

415 416
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
417
  pIter->pFileList = aDFileSet;
418
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
419

420 421 422 423
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
424
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
425 426
      return code;
    }
427 428
  }

429 430 431 432 433 434 435 436
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

437
  if (pLReader->pInfo == NULL) {
438
    // here we ignore the first column, which is always be the primary timestamp column
439 440
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
441 442 443 444
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
445 446
  }

447
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
448 449 450
  return TSDB_CODE_SUCCESS;
}

451
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
452 453
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
454 455 456
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
457 458 459
    return false;
  }

H
Haojun Liao 已提交
460 461 462
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

463 464
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
465
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
466

H
Haojun Liao 已提交
467 468
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
469

470
  while (1) {
H
Haojun Liao 已提交
471 472 473
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
474

475
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
476

477 478 479 480
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
481

482 483
    pReader->cost.headFileLoad += 1;

484 485 486 487 488 489 490 491 492 493 494 495
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
496 497 498
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
499 500
      continue;
    }
C
Cary Xu 已提交
501

502
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
503
              pReader->window.ekey, pReader->idStr);
504 505
    return true;
  }
506

H
Haojun Liao 已提交
507
_err:
H
Haojun Liao 已提交
508 509 510
  return false;
}

511
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
512 513
  pIter->order = order;
  pIter->index = -1;
514
  pIter->numOfBlocks = 0;
515 516 517 518 519 520 521
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
522
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
523

H
Haojun Liao 已提交
524
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
525 526
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
527 528
}

529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

552 553
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
554
  int32_t      code = 0;
555
  int8_t       level = 0;
H
Haojun Liao 已提交
556
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
557 558
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
559
    goto _end;
H
Hongze Cheng 已提交
560 561
  }

C
Cary Xu 已提交
562 563 564 565
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
566
  initReaderStatus(&pReader->status);
567

L
Liu Jicong 已提交
568
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
569 570
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
571
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
572 573
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
574
  pReader->type = pCond->type;
575
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
576
  pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
577
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
578

579
  limitOutputBufferSize(pCond, &pReader->capacity);
580

581 582
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
583
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
584
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
585
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
586 587 588
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
589

590 591
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
592
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
593 594 595 596 597
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

598 599 600 601
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
602
  }
H
Hongze Cheng 已提交
603

604 605
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
606 607
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
608

H
Haojun Liao 已提交
609
_end:
H
Haojun Liao 已提交
610
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
611 612 613
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
614

H
Haojun Liao 已提交
615
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
616
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
617

618
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
619
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
620
  if (code != TSDB_CODE_SUCCESS) {
621
    goto _end;
H
Haojun Liao 已提交
622
  }
H
Hongze Cheng 已提交
623

624 625
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
626
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
627 628
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
629

630 631 632 633
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
634
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
635

636
    // uid check
H
Hongze Cheng 已提交
637
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
638 639 640 641
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
642
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
643 644 645 646
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
647
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
648
    if (pScanInfo->pBlockList == NULL) {
649
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
650 651
    }

H
Hongze Cheng 已提交
652
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
653
  }
H
Hongze Cheng 已提交
654

655
  int64_t et2 = taosGetTimestampUs();
656
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
657
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
658 659 660

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
661
_end:
H
Hongze Cheng 已提交
662
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
663 664
  return code;
}
H
Hongze Cheng 已提交
665

666
static void cleanupTableScanInfo(SHashObj* pTableMap) {
667
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
668
  while (1) {
669
    px = taosHashIterate(pTableMap, px);
670 671 672 673
    if (px == NULL) {
      break;
    }

674
    // reset the index in last block when handing a new file
675 676
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
677
  }
678 679
}

680
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
681 682 683 684 685 686
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
687

dengyihao's avatar
dengyihao 已提交
688
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
689
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
690

H
Haojun Liao 已提交
691 692
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
693

694
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
695
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
696

697
    sizeInDisk += pScanInfo->mapData.nData;
698
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
699 700
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
701

702
      // 1. time range check
703
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
704 705
        continue;
      }
H
Hongze Cheng 已提交
706

707
      // 2. version range check
H
Hongze Cheng 已提交
708
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
709 710
        continue;
      }
711

712 713 714
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts};

H
Haojun Liao 已提交
715
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
716
      if (p == NULL) {
717
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
718 719
        return TSDB_CODE_OUT_OF_MEMORY;
      }
720

721
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
722
    }
H
Hongze Cheng 已提交
723

H
Haojun Liao 已提交
724
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
725 726 727 728
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
729
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
730
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
731

732
  double el = (taosGetTimestampUs() - st) / 1000.0;
733 734 735 736 737
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
738

739
  pReader->cost.numOfBlocks += total;
740
  pReader->cost.headFileLoadTime += el;
741

H
Haojun Liao 已提交
742 743
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
744

745
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
746
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
747
  pDumpInfo->allDumped = true;
748
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
749 750
}

751 752
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
753
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
754
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
755 756 757
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
758
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
759 760 761 762
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
763
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
764
  }
H
Haojun Liao 已提交
765 766
}

767
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
768 769
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
770 771
    return NULL;
  }
772 773 774

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
775 776
}

H
Hongze Cheng 已提交
777
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
778

H
Haojun Liao 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
852
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
853 854 855 856
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
857 858
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
859 860
    while (1) {
      // check can return
H
Hongze Cheng 已提交
861 862 863
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
864 865

      // change start or end position
H
Hongze Cheng 已提交
866
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
867 868
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
869
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
870 871 872 873
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
874
  } else {  // DESC
H
Haojun Liao 已提交
875
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
876 877
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
878 879
    while (1) {
      // check can return
H
Hongze Cheng 已提交
880 881 882
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
883 884

      // change start or end position
H
Hongze Cheng 已提交
885
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
886 887
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
888
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
889 890 891 892 893 894 895 896 897 898
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
899
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
900 901 902 903 904 905

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
906 907
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
908 909 910 911 912
  }

  return endPos;
}

913
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
914
  SReaderStatus*  pStatus = &pReader->status;
915
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
916

917
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
918
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
919
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
920
  SSDataBlock*        pResBlock = pReader->pResBlock;
921
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
922

H
Haojun Liao 已提交
923
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
924
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
925

H
Haojun Liao 已提交
926
  SColVal cv = {0};
927
  int64_t st = taosGetTimestampUs();
928 929
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
930

931 932
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
933 934 935
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
936 937
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
938 939 940
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
941
    }
H
Haojun Liao 已提交
942 943 944
  }

  // time window check
945 946 947 948 949 950 951
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
952
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
953
  if (remain > pReader->capacity) {  // output buffer check
954 955 956
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
957 958
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
959
  int32_t          i = 0;
960 961
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
962
    if (asc) {
H
Haojun Liao 已提交
963
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
964 965 966 967
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
968
    }
H
Haojun Liao 已提交
969

970 971 972
    i += 1;
  }

973 974 975
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
976 977 978
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
979
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
980 981 982
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
983
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
984 985 986 987
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
988
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
989 990 991 992

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
993
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1005
      }
H
Haojun Liao 已提交
1006

1007
      colIndex += 1;
1008
      i += 1;
1009 1010
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
1011
      i += 1;
H
Haojun Liao 已提交
1012
    }
1013 1014
  }

1015
  // fill the mis-matched columns with null value
1016
  while (i < numOfOutputCols) {
1017 1018 1019
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
1020
  }
H
Haojun Liao 已提交
1021

1022
  pResBlock->info.rows = remain;
1023
  pDumpInfo->rowIndex += step * remain;
1024

1025
  // check if current block are all handled
1026
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1027 1028 1029 1030
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1031
  } else {
1032 1033
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1034
  }
H
Haojun Liao 已提交
1035

1036
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1037
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1038

1039
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1040
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1041
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
1042 1043
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1044 1045 1046 1047

  return TSDB_CODE_SUCCESS;
}

1048 1049
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1050 1051
  int64_t st = taosGetTimestampUs();

1052 1053
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1054 1055
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1056 1057 1058 1059
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1060
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1061
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1062
  ASSERT(pBlockInfo != NULL);
1063

H
Hongze Cheng 已提交
1064
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1065
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1066 1067
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1068
                  ", rows:%d, code:%s %s",
1069
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1070 1071 1072
              tstrerror(code), pReader->idStr);
    return code;
  }
1073

1074
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1075

1076
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
1077
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1078 1079
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1080 1081 1082

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1083

H
Haojun Liao 已提交
1084
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1085
}
H
Hongze Cheng 已提交
1086

H
Haojun Liao 已提交
1087 1088 1089
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1090

H
Haojun Liao 已提交
1091 1092 1093 1094
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1095

H
Haojun Liao 已提交
1096 1097
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1098

H
Haojun Liao 已提交
1099 1100
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1101

H
Haojun Liao 已提交
1102
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1103 1104
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1105

H
Haojun Liao 已提交
1106 1107 1108 1109
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1110

H
Haojun Liao 已提交
1111 1112
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1113

H
Haojun Liao 已提交
1114
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1115
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1116
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1117

H
Haojun Liao 已提交
1118
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1119

H
Haojun Liao 已提交
1120 1121
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1122

H
Haojun Liao 已提交
1123 1124 1125 1126 1127 1128 1129
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1130

1131
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1132
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1133

1134 1135 1136
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1137
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1138 1139
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1140
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1141
    if (pScanInfo == NULL) {
1142
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1143 1144 1145
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1146 1147
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1148
  }
1149 1150 1151 1152 1153 1154

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1155
}
H
Hongze Cheng 已提交
1156

1157
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1158
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1159

1160
  SBlockOrderSupporter sup = {0};
1161
  pBlockIter->numOfBlocks = numOfBlocks;
1162
  taosArrayClear(pBlockIter->blockList);
1163
  pBlockIter->pTableMap = pReader->status.pTableMap;
1164

1165 1166
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1167

1168
  int64_t st = taosGetTimestampUs();
1169
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1170 1171 1172
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1173

1174 1175 1176 1177 1178 1179 1180
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1181

1182
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1183 1184 1185
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1186

1187 1188
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1189

1190 1191 1192 1193 1194
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1195

1196
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1197

1198 1199 1200
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1201
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1202 1203 1204 1205 1206
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1207

1208
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1209

1210
  // since there is only one table qualified, blocks are not sorted
1211 1212
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1213 1214
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1215
    }
1216

1217
    int64_t et = taosGetTimestampUs();
1218
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1219
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1220

1221
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1222
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1223
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1224
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1225
  }
H
Haojun Liao 已提交
1226

1227 1228
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1229

1230
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1231

1232 1233 1234 1235 1236
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1237
  }
H
Haojun Liao 已提交
1238

1239 1240 1241 1242
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1243

1244 1245
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1246

1247 1248 1249 1250
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1251

1252 1253
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1254
  }
H
Haojun Liao 已提交
1255

1256
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1257 1258
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1259 1260
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1261

1262
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1263
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1264

1265
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1266
}
H
Hongze Cheng 已提交
1267

H
Haojun Liao 已提交
1268
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1269 1270
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1271
  int32_t step = asc ? 1 : -1;
1272
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1273 1274 1275
    return false;
  }

1276
  pBlockIter->index += step;
H
Haojun Liao 已提交
1277
  doSetCurrentBlock(pBlockIter, idStr);
1278

1279 1280 1281
  return true;
}

1282 1283 1284
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1285
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1286 1287
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1288 1289
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1290
}
H
Hongze Cheng 已提交
1291

1292
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1293
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1294
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1295
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1296
    return false;
1297 1298
  }

H
Haojun Liao 已提交
1299
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1300
    return false;
1301 1302
  }

1303
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1304
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1305 1306
  *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
//  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1307
  return true;
1308 1309 1310 1311 1312
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1313
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1314 1315
  int32_t index = pBlockIter->index;

1316
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1329
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1330
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1331 1332 1333 1334
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1335 1336 1337 1338 1339
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1340

1341 1342 1343
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1344

H
Haojun Liao 已提交
1345
  doSetCurrentBlock(pBlockIter, "");
1346 1347 1348
  return TSDB_CODE_SUCCESS;
}

1349
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1350 1351
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1352
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1353
  } else {
1354
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1355
  }
H
Haojun Liao 已提交
1356
}
H
Hongze Cheng 已提交
1357

H
Hongze Cheng 已提交
1358
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1359
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1360

1361
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1362
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1363
}
H
Hongze Cheng 已提交
1364

H
Hongze Cheng 已提交
1365
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1366 1367
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1368 1369
}

1370
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t startIndex) {
1371 1372
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1373
  for (int32_t i = startIndex; i < num; i += 1) {
1374 1375
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1376
      if (p->version >= pBlock->minVer) {
1377 1378 1379
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1380
      if (p->version >= pBlock->minVer) {
1381 1382
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1383 1384
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1398
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1399 1400 1401 1402
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1403
  // ts is not overlap
1404
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1405
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1406 1407 1408 1409 1410
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1411
  if (ASCENDING_TRAVERSE(order)) {
1412
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1413 1414
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1415
    while (1) {
1416 1417 1418 1419 1420
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1421 1422 1423
      }
    }

1424
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1425
  }
1426 1427
}

H
Haojun Liao 已提交
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1441 1442
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1443

1444
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1445

1446
  // overlap with neighbor
1447
  if (hasNeighbor) {
1448
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1449 1450
  }

1451
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1452 1453
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1454

H
Haojun Liao 已提交
1455 1456 1457
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1458 1459
  }

H
Haojun Liao 已提交
1460 1461 1462 1463
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1464

H
Haojun Liao 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1479 1480 1481 1482

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1483 1484
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1485 1486 1487
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1488 1489 1490
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1491 1492
}

H
Haojun Liao 已提交
1493 1494 1495 1496 1497 1498 1499 1500 1501
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1502
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1503
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1504 1505
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1506

1507 1508 1509
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1510
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1511

1512
  blockDataUpdateTsWindow(pBlock, 0);
1513
  pBlock->info.uid = pBlockScanInfo->uid;
1514

1515
  setComposedBlockFlag(pReader, true);
1516

1517
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1518
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1519
                " - %" PRId64 " %s",
1520 1521
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1522 1523

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1524 1525 1526
  return code;
}

1527 1528
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1529 1530 1531 1532 1533
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1534
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1535 1536

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1537
    if (nextKey != key) {  // merge is not needed
1538
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1539 1540 1541 1542 1543 1544 1545 1546
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1547 1548
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1549 1550 1551 1552 1553 1554 1555 1556
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1557 1558
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1559 1560 1561 1562 1563 1564 1565
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1566
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1581 1582 1583
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1584
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1585 1586
  }

1587
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1588 1589 1590 1591 1592 1593
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1594 1595 1596 1597 1598 1599
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1600 1601 1602 1603 1604 1605
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1606
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1607
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1608
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1609 1610
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1611 1612
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1613
  }
H
Haojun Liao 已提交
1614 1615
}

1616
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1617 1618 1619 1620 1621 1622
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1623
  int64_t tsLast = INT64_MIN;
1624
  if (hasDataInLastBlock(pLastBlockReader)) {
1625 1626
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1627

H
Hongze Cheng 已提交
1628 1629
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1630

1631 1632
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1633
    minKey = INT64_MAX;  // chosen the minimum value
1634
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1635 1636
      minKey = tsLast;
    }
1637

1638 1639 1640
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1641

1642
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1643 1644 1645 1646
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1647
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1648 1649 1650 1651 1652 1653 1654
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1655
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1656 1657
      minKey = key;
    }
1658 1659 1660 1661
  }

  bool init = false;

1662
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1663
  // DESC: mem -----> imem -----> last block -----> file block
1664 1665
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1666
      init = true;
H
Haojun Liao 已提交
1667 1668 1669 1670
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1671
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1672 1673
    }

1674
    if (minKey == tsLast) {
1675
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1676 1677 1678
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1679
        init = true;
H
Haojun Liao 已提交
1680 1681 1682 1683
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1684
      }
1685
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1686
    }
1687

1688
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1689 1690 1691
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1692 1693
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1694 1695 1696 1697 1698 1699 1700 1701
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1702 1703 1704 1705 1706
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1707
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1708 1709 1710 1711 1712 1713
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1714
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1715 1716
        return code;
      }
1717 1718
    }

1719
    if (minKey == tsLast) {
1720
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1721 1722 1723
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1724
        init = true;
H
Haojun Liao 已提交
1725 1726 1727 1728
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1729
      }
1730
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1731 1732 1733
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1734 1735 1736
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1737
        init = true;
H
Haojun Liao 已提交
1738 1739 1740 1741
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1742 1743 1744
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1745 1746
  }

1747 1748 1749 1750 1751
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1752 1753 1754 1755 1756 1757 1758
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1759 1760 1761
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1762
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1763
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1764 1765 1766

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1767
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1768
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1769

1770 1771 1772 1773 1774
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1775 1776 1777 1778
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1779 1780 1781

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1782
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1783

H
Haojun Liao 已提交
1784
      code = tRowMergerGetRow(&merge, &pTSRow);
1785 1786 1787
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1788

1789 1790 1791 1792 1793 1794
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1795 1796 1797 1798 1799
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1800
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1801
    ASSERT(mergeBlockData);
1802 1803

    // merge with block data if ts == key
H
Haojun Liao 已提交
1804
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1805 1806 1807
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1808
    code = tRowMergerGetRow(&merge, &pTSRow);
1809 1810 1811 1812 1813 1814 1815 1816 1817
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1818 1819 1820 1821

  return TSDB_CODE_SUCCESS;
}

1822 1823
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1824 1825
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1826
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1827
    // no last block available, only data block exists
1828
    if (!hasDataInLastBlock(pLastBlockReader)) {
1829 1830 1831 1832 1833 1834 1835 1836 1837
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1838
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1839 1840 1841 1842
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1843

H
Haojun Liao 已提交
1844 1845 1846 1847 1848
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1849
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1850 1851 1852 1853

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1854
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1855

H
Haojun Liao 已提交
1856
        code = tRowMergerGetRow(&merge, &pTSRow);
1857 1858 1859 1860
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1861
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1862

1863 1864
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1865
        return code;
1866
      } else {
1867 1868
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1869
      }
1870
    } else {  // desc order
1871
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1872
    }
1873
  } else {  // only last block exists
1874
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1875
  }
1876 1877
}

1878 1879
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1880 1881 1882
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1883 1884 1885
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1886 1887
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1888 1889
  ASSERT(pRow != NULL && piRow != NULL);

1890
  int64_t tsLast = INT64_MIN;
1891 1892 1893
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1894

H
Hongze Cheng 已提交
1895
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1896 1897 1898 1899

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1900
  int64_t minKey = 0;
1901 1902 1903 1904 1905
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1906

1907 1908 1909
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1910

1911
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1912 1913
      minKey = key;
    }
1914

1915
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1916 1917 1918
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1919
    minKey = INT64_MIN;  // let find the maximum ts value
1920 1921 1922 1923 1924 1925 1926 1927
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1928
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1929 1930 1931
      minKey = key;
    }

1932
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1933 1934
      minKey = tsLast;
    }
1935 1936 1937 1938
  }

  bool init = false;

1939 1940 1941 1942
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1943
      init = true;
1944
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1945
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1946 1947 1948 1949
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1950
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1951 1952
    }

1953
    if (minKey == tsLast) {
1954
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1955 1956 1957
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1958
        init = true;
1959
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1960 1961 1962
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1963
      }
H
Haojun Liao 已提交
1964

1965
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1966 1967 1968
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1969 1970 1971
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1972 1973
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1974 1975 1976 1977
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1978
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1979 1980 1981
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1982
      }
H
Haojun Liao 已提交
1983

1984 1985
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1986 1987 1988
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1989 1990
    }

1991
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1992
      if (init) {
1993 1994 1995 1996
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1997 1998
        tRowMerge(&merge, pRow);
      } else {
1999
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2000
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2001 2002 2003
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2004
      }
H
Haojun Liao 已提交
2005 2006 2007 2008 2009
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2010 2011 2012 2013
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2014
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2015
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2016 2017 2018 2019
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2020 2021 2022 2023 2024
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2025 2026 2027
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2028 2029 2030
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2031 2032
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2033
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2034 2035 2036
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2037
      }
H
Haojun Liao 已提交
2038 2039 2040 2041 2042
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2043 2044 2045
    }

    if (minKey == tsLast) {
2046
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2047 2048 2049
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2050
        init = true;
2051
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2052 2053 2054
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2055
      }
2056
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2057 2058 2059
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2060
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2061
      if (!init) {
2062
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2063 2064 2065
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2066
      } else {
2067 2068 2069
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2070
        tRowMerge(&merge, &fRow);
2071 2072
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2073 2074 2075
    }
  }

2076
  if (merge.pTSchema == NULL) {
2077 2078 2079
    return code;
  }

H
Haojun Liao 已提交
2080
  code = tRowMergerGetRow(&merge, &pTSRow);
2081 2082 2083 2084
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2085 2086 2087 2088
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2089
  return code;
2090 2091
}

2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2117
                  "-%" PRId64 " %s",
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2138
                  "-%" PRId64 " %s",
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2156 2157
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2158 2159 2160 2161 2162 2163 2164 2165
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2177
  TSDBKEY k = {.ts = ts, .version = ver};
2178 2179
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2180 2181 2182
    return false;
  }

2183 2184 2185
  return true;
}

2186
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2187
  // the last block reader has been initialized for this table.
2188
  if (pLBlockReader->uid == pScanInfo->uid) {
2189
    return hasDataInLastBlock(pLBlockReader);
2190 2191
  }

2192 2193
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2194 2195
  }

2196 2197
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2198

H
Hongze Cheng 已提交
2199
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2200 2201 2202
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2203
  } else {
2204
    w.ekey = pScanInfo->lastKey + step;
2205 2206
  }

2207 2208 2209
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2210 2211 2212 2213
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2214
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2215 2216
}

2217
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2218
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2219
  return TSDBROW_TS(&row);
2220 2221
}

H
Hongze Cheng 已提交
2222
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2223 2224 2225 2226

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2227 2228
  }

2229
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2230
}
2231

2232 2233
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2234 2235 2236 2237
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2238 2239
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2240 2241 2242
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2243 2244 2245 2246 2247
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2248
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2249
    code = tRowMergerGetRow(&merge, &pTSRow);
2250 2251 2252 2253
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2254 2255 2256 2257 2258 2259 2260 2261
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2262 2263
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2264 2265
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2266
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2267
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2268
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2269
  } else {
2270 2271 2272 2273 2274 2275 2276 2277 2278
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2279
    // imem + file + last block
2280
    if (pBlockScanInfo->iiter.hasVal) {
2281
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2282 2283
    }

2284
    // mem + file + last block
2285
    if (pBlockScanInfo->iter.hasVal) {
2286
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2287
    }
2288

2289 2290
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2291 2292 2293
  }
}

2294
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2295 2296
  int32_t code = TSDB_CODE_SUCCESS;

2297 2298
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2299
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2300 2301 2302
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2303 2304 2305

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2306
    pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2307 2308
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2309 2310
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2311 2312 2313
      goto _end;
    }

H
Haojun Liao 已提交
2314
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2315
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2316 2317 2318

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2319 2320
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2321
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2322 2323 2324

        // record the last key value
        pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pBlock->maxKey.ts:pBlock->minKey.ts;
H
Haojun Liao 已提交
2325 2326
        goto _end;
      }
H
Haojun Liao 已提交
2327 2328
    }
  } else {  // file blocks not exist
2329
    pBlockScanInfo = *pReader->status.pTableIter;
2330 2331
  }

2332
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2333 2334
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2335

2336
  while (1) {
2337
    bool hasBlockData = false;
2338
    {
H
Haojun Liao 已提交
2339
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2340 2341 2342 2343 2344
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2345 2346
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2347
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2348
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2349
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2350 2351 2352
          break;
        }
      }
2353
    }
2354

2355
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2356

2357 2358 2359
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2360 2361
    }

2362
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2363

2364
    // currently loaded file data block is consumed
2365
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2366
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2367
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2368 2369 2370 2371 2372
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2373 2374 2375
    }
  }

H
Haojun Liao 已提交
2376
_end:
2377
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2378 2379
  blockDataUpdateTsWindow(pResBlock, 0);

2380
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2381
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2382 2383 2384

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2385

2386 2387
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2388
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2389
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2390
              pResBlock->info.rows, el, pReader->idStr);
2391
  }
2392

H
Haojun Liao 已提交
2393
  return code;
2394 2395 2396 2397
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2398 2399
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2400 2401 2402
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2403

2404 2405 2406
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2407 2408
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2409
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2410 2411
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2412
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2413
    if (code != TSDB_CODE_SUCCESS) {
2414 2415 2416 2417 2418
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2419
      tsdbDelFReaderClose(&pDelFReader);
2420 2421 2422
      goto _err;
    }

H
Hongze Cheng 已提交
2423
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2424 2425 2426
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2427 2428
      goto _err;
    }
2429

2430 2431 2432
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2433
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2434
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2435 2436 2437 2438 2439 2440 2441
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2442
    }
2443
  }
2444

2445 2446 2447 2448 2449 2450 2451
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2452 2453
  }

2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2468
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2469
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2470 2471
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2472
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2473 2474
  return code;

H
Haojun Liao 已提交
2475
_err:
2476 2477
  taosArrayDestroy(pDelData);
  return code;
2478 2479
}

H
Haojun Liao 已提交
2480
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2481
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2482
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2483
  if (pRow != NULL) {
2484 2485 2486
    key = TSDBROW_KEY(pRow);
  }

2487
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2488
  if (pRow != NULL) {
2489 2490 2491 2492 2493 2494 2495 2496 2497
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2498
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2499
  SReaderStatus* pStatus = &pReader->status;
2500
  pBlockNum->numOfBlocks = 0;
2501
  pBlockNum->numOfLastFiles = 0;
2502

2503
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2504
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2505 2506

  while (1) {
2507
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2508
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2509 2510 2511
      break;
    }

H
Haojun Liao 已提交
2512
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2513 2514
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2515
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2516 2517 2518
      return code;
    }

H
Hongze Cheng 已提交
2519
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2520
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2521
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2522
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2523 2524 2525
        return code;
      }

2526
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2527 2528 2529
        break;
      }
    }
2530

H
Haojun Liao 已提交
2531 2532 2533
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2534
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2535 2536 2537
  return TSDB_CODE_SUCCESS;
}

2538
static int32_t uidComparFunc(const void* p1, const void* p2) {
2539 2540
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2541 2542 2543
  if (pu1 == pu2) {
    return 0;
  } else {
2544
    return (pu1 < pu2) ? -1 : 1;
2545 2546
  }
}
2547

2548
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2549 2550 2551 2552
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2553
  while (p != NULL) {
2554
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
2555 2556 2557 2558 2559 2560 2561
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2562
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2563 2564 2565 2566
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2567

2568
  if (pOrderCheckInfo->tableUidList == NULL) {
2569 2570 2571 2572 2573 2574
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2575
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2576 2577 2578
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2579 2580
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2581 2582
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2583 2584

      // the tableMap has already updated
2585
      if (pStatus->pTableIter == NULL) {
2586
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2587 2588 2589 2590 2591 2592 2593 2594 2595
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2596
      }
2597
    }
2598
  }
2599

2600 2601 2602
  return TSDB_CODE_SUCCESS;
}

2603
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2616
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2617
  SReaderStatus*    pStatus = &pReader->status;
2618 2619
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2620 2621
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2622
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2623 2624
    return code;
  }
2625

2626
  while (1) {
2627
    // load the last data block of current table
2628
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
H
Hongze Cheng 已提交
2629
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2630
    if (!hasVal) {
2631 2632
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2633 2634 2635
        return TSDB_CODE_SUCCESS;
      }
      continue;
2636 2637
    }

2638 2639 2640 2641 2642 2643 2644 2645
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2646

2647
    // current table is exhausted, let's try next table
2648 2649
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2650 2651
      return TSDB_CODE_SUCCESS;
    }
2652 2653 2654
  }
}

2655
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2656 2657
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2658 2659 2660

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2661 2662 2663
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2664

2665
  if (pBlockInfo != NULL) {
2666
    pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2667
  } else {
2668
    pScanInfo = *pReader->status.pTableIter;
2669 2670
  }

H
Haojun Liao 已提交
2671
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2672
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2673 2674 2675 2676
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2677
  if (pBlockInfo != NULL) {
2678
    pBlock = getCurrentBlock(pBlockIter);
2679 2680
  }

2681
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2682
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2683

2684 2685 2686
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2687
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2688
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2689 2690
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2691 2692 2693
    }

    // build composed data block
2694
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2695
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2696
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2697
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2698
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2699
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2700 2701 2702 2703
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2704
      ASSERT(tsLast >= pBlock->maxKey.ts);
2705 2706
      tBlockDataReset(&pReader->status.fileBlockData);

2707
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2708
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2709
    } else {  // whole block is required, return it directly
2710 2711 2712 2713 2714 2715
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2716

2717 2718
      // update the last key for the corresponding table
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pInfo->window.ekey:pInfo->window.skey;
2719
    }
2720 2721 2722 2723 2724
  }

  return code;
}

H
Haojun Liao 已提交
2725
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2726 2727
  SReaderStatus* pStatus = &pReader->status;

2728
  while (1) {
2729 2730 2731
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2732
        return TSDB_CODE_SUCCESS;
2733 2734 2735
      }
    }

2736 2737
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2738

2739
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2740
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2741 2742 2743 2744
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2745
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2746
      return TSDB_CODE_SUCCESS;
2747 2748 2749 2750 2751
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2752
      return TSDB_CODE_SUCCESS;
2753 2754 2755 2756
    }
  }
}

2757
// set the correct start position in case of the first/last file block, according to the query time window
2758
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2759
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2760

2761 2762 2763
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2764 2765 2766

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2767
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2768 2769
}

2770
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2771 2772
  SBlockNumber num = {0};

2773
  int32_t code = moveToNextFile(pReader, &num);
2774 2775 2776 2777 2778
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2779
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2780 2781 2782 2783 2784
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2785 2786
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2787
  } else {  // no block data, only last block exists
2788
    tBlockDataReset(&pReader->status.fileBlockData);
2789
    resetDataBlockIterator(pBlockIter, pReader->order);
2790
  }
2791 2792

  // set the correct start position according to the query time window
2793
  initBlockDumpInfo(pReader, pBlockIter);
2794 2795 2796
  return code;
}

2797
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2798 2799
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2800 2801
}

2802
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2803
  int32_t code = TSDB_CODE_SUCCESS;
2804 2805
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2806 2807
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2808
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2809
  _begin:
2810 2811 2812 2813 2814
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2815 2816 2817 2818
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2819
    // all data blocks are checked in this last block file, now let's try the next file
2820 2821 2822 2823 2824 2825 2826 2827
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2828
      // this file does not have data files, let's start check the last block file if exists
2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2844
  while (1) {
2845 2846
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2847
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2848
      code = buildComposedDataBlock(pReader);
2849 2850 2851 2852
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2853
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2854 2855
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2856
        } else {
H
Haojun Liao 已提交
2857
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2858 2859 2860 2861 2862 2863
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2864

2865 2866 2867 2868
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2869

2870 2871 2872 2873
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2874
          }
2875
        }
H
Haojun Liao 已提交
2876
      }
2877 2878

      code = doBuildDataBlock(pReader);
2879 2880
    }

2881 2882 2883 2884 2885 2886 2887 2888
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2889
}
H
refact  
Hongze Cheng 已提交
2890

2891 2892
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2893
  if (VND_IS_RSMA(pVnode)) {
2894
    int8_t  level = 0;
2895 2896
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2897
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2898 2899
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2900

2901
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2902 2903 2904 2905 2906 2907 2908
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2909
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2910 2911 2912 2913 2914
        break;
      }
      ++level;
    }

2915
    const char* str = (idStr != NULL) ? idStr : "";
2916 2917

    if (level == TSDB_RETENTION_L0) {
2918
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2919
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2920 2921
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2922
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2923
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2924 2925
      return VND_RSMA1(pVnode);
    } else {
2926
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2927
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2928 2929 2930 2931 2932 2933 2934
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2935
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2936
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2937 2938

  int64_t endVer = 0;
L
Liu Jicong 已提交
2939 2940
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2941 2942
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2943
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2944 2945
  }

H
Haojun Liao 已提交
2946
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2947 2948
}

2949
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2950 2951 2952 2953
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2954 2955 2956
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2957

2958 2959 2960 2961 2962 2963
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2964
        return false;
2965 2966 2967
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2968 2969
      }
    } else {
2970 2971 2972 2973 2974 2975 2976
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2977 2978
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2994 2995
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2996 2997 2998 2999 3000 3001
            return true;
          }
        }
      }

      return false;
3002 3003
    }
  } else {
3004 3005
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3006

3007 3008 3009 3010 3011 3012 3013
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3014
    } else {
3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3042 3043 3044 3045 3046
      }

      return false;
    }
  }
3047 3048

  return false;
3049 3050
}

3051
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3052
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3053 3054
    return NULL;
  }
H
Hongze Cheng 已提交
3055

3056
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3057
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3058
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3059
    pIter->hasVal = false;
H
Haojun Liao 已提交
3060 3061
    return NULL;
  }
H
Hongze Cheng 已提交
3062

3063
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3064
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3065
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3066 3067
    return pRow;
  }
H
Hongze Cheng 已提交
3068

3069
  while (1) {
3070 3071
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3072 3073
      return NULL;
    }
H
Hongze Cheng 已提交
3074

3075
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3076

H
Haojun Liao 已提交
3077
    key = TSDBROW_KEY(pRow);
3078
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3079
      pIter->hasVal = false;
H
Haojun Liao 已提交
3080 3081
      return NULL;
    }
H
Hongze Cheng 已提交
3082

dengyihao's avatar
dengyihao 已提交
3083
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3084
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3085 3086 3087 3088
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3089

3090 3091
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3092
  while (1) {
3093 3094
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3095 3096
      break;
    }
H
Hongze Cheng 已提交
3097

3098
    // data exists but not valid
3099
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3100 3101 3102 3103 3104
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3105
    TSDBKEY k = TSDBROW_KEY(pRow);
3106
    if (k.ts != ts) {
H
Haojun Liao 已提交
3107 3108 3109
      break;
    }

H
Haojun Liao 已提交
3110
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3111 3112 3113 3114
    if (pTSchema == NULL) {
      return terrno;
    }

3115
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3116 3117 3118 3119 3120
  }

  return TSDB_CODE_SUCCESS;
}

3121
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3122
                                          SVersionRange* pVerRange, int32_t step) {
3123 3124
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3125
      rowIndex += step;
3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3142
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3143 3144
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3145
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3146
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3147

3148
  *state = CHECK_FILEBLOCK_QUIT;
3149
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3150

3151 3152 3153 3154
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3155
  if (!hasNeighbor) {  // do nothing
3156 3157 3158
    return 0;
  }

3159
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3160
  if (overlap) {  // load next block
3161
    SReaderStatus*  pStatus = &pReader->status;
3162 3163
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3164
    // 1. find the next neighbor block in the scan block list
3165
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3166
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3167

3168
    // 2. remove it from the scan block list
3169
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3170

3171
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3172
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3173 3174 3175 3176
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3177
    // 4. check the data values
3178 3179 3180 3181
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3182
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3183 3184 3185 3186 3187 3188 3189
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3190 3191
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3192 3193
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3194
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3195
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3196
  int32_t step = asc ? 1 : -1;
3197

3198
  pDumpInfo->rowIndex += step;
3199
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3200 3201 3202
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3203

3204 3205 3206 3207
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3208

3209
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3210
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3211 3212 3213
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3214
      }
3215
    }
H
Haojun Liao 已提交
3216
  }
3217

H
Haojun Liao 已提交
3218 3219 3220
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3221
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3222
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3223
  pScanInfo->lastKey = ts;
3224
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3225 3226
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3227
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3228 3229 3230 3231 3232 3233 3234 3235 3236
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3237 3238
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3239
  TSDBROW* pNextRow = NULL;
3240
  TSDBROW  current = *pRow;
3241

3242 3243
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3244

3245 3246 3247
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3248
      return TSDB_CODE_SUCCESS;
3249
    } else {  // has next point in mem/imem
3250
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3251 3252 3253
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3254
        return TSDB_CODE_SUCCESS;
3255 3256
      }

H
Haojun Liao 已提交
3257
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3258 3259
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3260
        return TSDB_CODE_SUCCESS;
3261
      }
3262
    }
3263 3264
  }

3265 3266
  SRowMerger merge = {0};

3267
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3268
  terrno = 0;
H
Haojun Liao 已提交
3269
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3270 3271 3272
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3273

3274 3275
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3276
  }
H
Haojun Liao 已提交
3277

H
Haojun Liao 已提交
3278 3279 3280 3281
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3282 3283

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3284
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3285 3286 3287
    return terrno;
  }

H
Haojun Liao 已提交
3288 3289
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3290 3291 3292 3293 3294
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3295
  code = tRowMergerGetRow(&merge, pTSRow);
3296 3297 3298
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3299

3300
  tRowMergerClear(&merge);
3301
  *freeTSRow = true;
3302
  return TSDB_CODE_SUCCESS;
3303 3304
}

3305
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3306
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3307 3308
  SRowMerger merge = {0};

3309 3310 3311
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3312
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3313
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3314

H
Haojun Liao 已提交
3315 3316 3317 3318 3319 3320 3321 3322 3323 3324
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3325

3326
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3327 3328 3329 3330 3331 3332
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3333
  } else {
H
Haojun Liao 已提交
3334
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3335

H
Haojun Liao 已提交
3336
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3337
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3338 3339 3340 3341 3342 3343 3344 3345
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3346 3347

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3348 3349 3350 3351 3352
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3353
  }
3354

3355 3356
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3357 3358
}

3359 3360
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3361 3362
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3363
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3364
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3365

3366 3367
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3368
  if (pBlockScanInfo->iter.hasVal) {
3369 3370 3371 3372 3373 3374
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3375
  if (pBlockScanInfo->iiter.hasVal) {
3376 3377 3378 3379 3380 3381
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3382
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3383
    TSDBKEY k = TSDBROW_KEY(pRow);
3384
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3385

3386
    int32_t code = TSDB_CODE_SUCCESS;
3387 3388
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3389
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3390
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3391
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3392
      }
3393
    } else {  // ik.ts == k.ts
3394
      *freeTSRow = true;
3395 3396 3397 3398
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3399
    }
3400

3401
    return code;
H
Haojun Liao 已提交
3402 3403
  }

3404
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3405 3406
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3407 3408
  }

3409
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3410
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3411 3412 3413 3414 3415
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3416
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3417 3418 3419
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3420
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3421
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3422

3423
  SColVal colVal = {0};
3424
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3425

3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3437
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3438 3439 3440 3441 3442 3443 3444 3445
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3446
    }
3447 3448
  }

3449
  // set null value since current column does not exist in the "pSchema"
3450
  while (i < numOfCols) {
3451 3452 3453 3454 3455
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3456 3457 3458 3459
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3460 3461
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3462 3463 3464 3465 3466 3467 3468 3469
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3470
    i += 1;
3471 3472 3473
  }

  SColVal cv = {0};
3474 3475
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3476

3477
  while (i < numOfOutputCols && j < numOfInputCols) {
3478
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3479
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3480

3481 3482 3483 3484 3485
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3486
    if (pData->cid == pCol->info.colId) {
3487 3488
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3489
      j += 1;
H
Haojun Liao 已提交
3490 3491
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3492 3493 3494 3495 3496 3497 3498 3499
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3500
    colDataAppendNULL(pCol, outputRowIndex);
3501 3502 3503 3504 3505 3506 3507
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3508 3509
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3510 3511 3512 3513
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3514
    bool    freeTSRow = false;
3515
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3516 3517
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3518 3519
    }

H
Haojun Liao 已提交
3520
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3521 3522 3523
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3524 3525

    // no data in buffer, return immediately
3526
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3527 3528 3529
      break;
    }

3530
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3531 3532 3533 3534
      break;
    }
  } while (1);

3535
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3536 3537
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3538

3539 3540
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3541
  ASSERT(pReader != NULL);
3542
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3543

3544
  STableBlockScanInfo** p = NULL;
3545
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3546
    clearBlockScanInfo(*p);
3547 3548
  }

3549 3550 3551
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3552 3553
  taosHashClear(pReader->status.pTableMap);

3554 3555
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
3556 3557 3558
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3559 3560
  }

H
Hongze Cheng 已提交
3561 3562 3563
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3564 3565 3566 3567 3568 3569
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3570

dengyihao's avatar
dengyihao 已提交
3571 3572 3573 3574 3575 3576
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3577

H
Hongze Cheng 已提交
3578
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3579

3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3595
// ====================================== EXPOSED APIs ======================================
3596 3597
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3598 3599 3600 3601 3602 3603
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3604 3605
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3606 3607
    goto _err;
  }
H
Hongze Cheng 已提交
3608

3609
  // check for query time window
H
Haojun Liao 已提交
3610
  STsdbReader* pReader = *ppReader;
3611
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3612 3613 3614
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3615

3616 3617
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3618
    int32_t order = pCond->order;
3619
    if (order == TSDB_ORDER_ASC) {
3620
      pCond->twindows.ekey = window.skey;
3621 3622 3623
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3624
      pCond->twindows.skey = window.ekey;
3625 3626 3627 3628
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3629
    // here we only need one more row, so the capacity is set to be ONE.
3630 3631 3632 3633 3634 3635
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3636
      pCond->twindows.skey = window.ekey;
3637
      pCond->twindows.ekey = INT64_MAX;
3638
    } else {
3639
      pCond->twindows.skey = INT64_MIN;
3640
      pCond->twindows.ekey = window.ekey;
3641
    }
3642 3643
    pCond->order = order;

3644 3645 3646 3647 3648 3649
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3650
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3651
  if (pCond->suid != 0) {
3652
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3653
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3654
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3655
    }
3656 3657
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3658
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3659
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3660
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3661
    }
3662 3663
  }

H
Hongze Cheng 已提交
3664
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3665

3666
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3667 3668 3669
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3670

H
Haojun Liao 已提交
3671 3672 3673
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3674

3675
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3676
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3677 3678 3679
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3680

3681
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3682 3683 3684
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3685
      }
3686
    } else {
H
Haojun Liao 已提交
3687 3688
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3689

H
Haojun Liao 已提交
3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3702

H
Haojun Liao 已提交
3703
      code = doOpenReaderImpl(pPrevReader);
3704
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3705
        return code;
3706
      }
3707 3708 3709
    }
  }

3710
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3711
  return code;
H
Hongze Cheng 已提交
3712

3713
  _err:
H
Haojun Liao 已提交
3714
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3715
  return code;
H
refact  
Hongze Cheng 已提交
3716 3717 3718
}

void tsdbReaderClose(STsdbReader* pReader) {
3719 3720
  if (pReader == NULL) {
    return;
3721
  }
H
refact  
Hongze Cheng 已提交
3722

3723 3724
  {
    if (pReader->innerReader[0] != NULL) {
3725
      STsdbReader* p = pReader->innerReader[0];
3726

3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3738 3739 3740 3741 3742 3743

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3744
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3745

3746 3747 3748 3749
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3750
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3751 3752 3753 3754
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3755

3756
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3757
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3758 3759

  cleanupDataBlockIterator(&pReader->status.blockIter);
3760 3761

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3762
  destroyAllBlockScanInfo(pReader->status.pTableMap);
3763
  blockDataDestroy(pReader->pResBlock);
3764
  clearBlockScanInfoBuf(&pReader->blockInfoBuf);
3765

H
Haojun Liao 已提交
3766 3767 3768
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3769

H
Haojun Liao 已提交
3770
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3771

3772
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3773
  SIOCostSummary* pCost = &pReader->cost;
3774

H
Haojun Liao 已提交
3775 3776
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3777 3778
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3779

H
Haojun Liao 已提交
3780 3781 3782 3783 3784
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3785

H
Haojun Liao 已提交
3786 3787 3788 3789 3790 3791 3792 3793 3794 3795
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3796

3797 3798
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3799 3800 3801
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3802
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3803 3804
}

3805
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3806
  // cleanup the data that belongs to the previous data block
3807 3808
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3809

3810
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3811

3812 3813 3814 3815 3816
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3817

3818 3819 3820
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3821
      buildBlockFromBufferSequentially(pReader);
3822
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3823
    }
3824 3825 3826
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3827
  }
3828

3829
  return false;
H
refact  
Hongze Cheng 已提交
3830 3831
}

3832 3833 3834 3835 3836
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3837
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3838
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3839
    pReader->step = EXTERNAL_ROWS_PREV;
3840 3841 3842
    if (ret) {
      return ret;
    }
3843
  }
3844

3845
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3846 3847
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3848
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3849 3850 3851 3852 3853

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3854
    pReader->step = EXTERNAL_ROWS_MAIN;
3855 3856 3857 3858 3859 3860 3861
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3862
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3863 3864
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3865
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3866 3867 3868 3869
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3870
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3871
    pReader->step = EXTERNAL_ROWS_NEXT;
3872 3873 3874 3875 3876 3877 3878 3879
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3880
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
3881
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3882
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3883 3884 3885
    return false;
  }

H
Haojun Liao 已提交
3886
  return true;
3887 3888
}

H
Haojun Liao 已提交
3889 3890 3891 3892 3893
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3894 3895
}

H
Haojun Liao 已提交
3896
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3897
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3898
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3899
      setBlockInfo(pReader, rows, uid, pWindow);
3900
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3901
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3902
    } else {
H
Haojun Liao 已提交
3903
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3904 3905
    }
  } else {
H
Haojun Liao 已提交
3906
    setBlockInfo(pReader, rows, uid, pWindow);
3907 3908 3909
  }
}

3910
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3911
  int32_t code = 0;
3912
  *allHave = false;
H
Hongze Cheng 已提交
3913

3914
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3915 3916 3917 3918
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3919
  // there is no statistics data for composed block
3920 3921 3922 3923
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3924

3925
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3926

H
Hongze Cheng 已提交
3927
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
3928
//  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3929

3930 3931
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3932
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3933
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3934
    if (code != TSDB_CODE_SUCCESS) {
3935 3936
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3937 3938
      return code;
    }
3939 3940 3941
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3942
  }
H
Hongze Cheng 已提交
3943

3944
  *allHave = true;
H
Hongze Cheng 已提交
3945

3946 3947
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3948

3949 3950
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3951 3952 3953 3954 3955 3956 3957 3958
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
3959 3960 3961
  size_t size = taosArrayGetSize(pSup->pColAgg);

  while (j < numOfCols && i < size) {
3962 3963 3964 3965 3966 3967 3968
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3969 3970
      i += 1;
      j += 1;
3971 3972 3973 3974 3975 3976 3977
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3978
  pReader->cost.smaDataLoad += 1;
3979 3980
  *pBlockStatis = pSup->plist;

3981
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
3982

H
Hongze Cheng 已提交
3983
  return code;
H
Hongze Cheng 已提交
3984 3985
}

3986
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3987 3988 3989
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3990
    return pReader->pResBlock->pDataBlock;
3991
  }
3992

H
Haojun Liao 已提交
3993
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
3994
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
3995 3996 3997 3998 3999 4000
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4001

4002
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4003
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4004
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4005 4006
    terrno = code;
    return NULL;
4007
  }
4008 4009 4010

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4011 4012
}

4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4025
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4026
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4027 4028
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4029

4030 4031
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4032
  pReader->order = pCond->order;
4033
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4034
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4035
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4036
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4037

4038
  // allocate buffer in order to load data blocks from file
4039
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4040 4041
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4042
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4043
  tsdbDataFReaderClose(&pReader->pFileReader);
4044

4045
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4046

4047
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4048
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4049

H
Hongze Cheng 已提交
4050
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4051
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4052

4053
  int32_t code = 0;
4054

4055 4056 4057 4058 4059 4060
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4061 4062
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4063 4064 4065
      return code;
    }
  }
H
Hongze Cheng 已提交
4066

H
Hongze Cheng 已提交
4067
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
4068
                " in query %s",
H
Hongze Cheng 已提交
4069 4070
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4071

4072
  return code;
H
Hongze Cheng 已提交
4073
}
H
Hongze Cheng 已提交
4074

4075 4076 4077
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4078

4079 4080 4081 4082
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4083

4084 4085
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4086

4087 4088 4089
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4090

4091
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4092

4093
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4094

4095 4096
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4097

4098 4099
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4100

4101 4102
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4103
  }
H
Hongze Cheng 已提交
4104

4105
  pTableBlockInfo->numOfTables = numOfTables;
4106
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4107

4108 4109
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4110
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4111

4112 4113
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4114

4115 4116 4117
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4118

4119 4120 4121
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4122

4123 4124 4125
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4126

4127 4128
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4129

H
Haojun Liao 已提交
4130
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4131 4132 4133 4134 4135
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4136

4137 4138
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4139
    }
H
refact  
Hongze Cheng 已提交
4140

H
Hongze Cheng 已提交
4141 4142
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4143
  }
H
Hongze Cheng 已提交
4144

H
refact  
Hongze Cheng 已提交
4145 4146
  return code;
}
H
Hongze Cheng 已提交
4147

H
refact  
Hongze Cheng 已提交
4148
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4149
  int64_t rows = 0;
H
Hongze Cheng 已提交
4150

4151 4152
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4153

4154
  while (pStatus->pTableIter != NULL) {
4155
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4156 4157 4158

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4159
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4160 4161 4162 4163 4164 4165 4166
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4167
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4168 4169 4170 4171 4172 4173 4174 4175
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4176

H
refact  
Hongze Cheng 已提交
4177
  return rows;
H
Hongze Cheng 已提交
4178
}
D
dapan1121 已提交
4179

L
Liu Jicong 已提交
4180
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4193

D
dapan1121 已提交
4194
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4195
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4210
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4211

D
dapan1121 已提交
4212 4213
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4214

H
Haojun Liao 已提交
4215
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4244
  // fs
H
Hongze Cheng 已提交
4245 4246 4247 4248 4249
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4250 4251 4252 4253 4254 4255 4256 4257

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4258
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4259
  _exit:
H
Hongze Cheng 已提交
4260 4261 4262
  return code;
}

H
Haojun Liao 已提交
4263
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4264 4265 4266 4267 4268 4269 4270 4271 4272
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4273
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4274
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4275
  }
H
Haojun Liao 已提交
4276 4277
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}