tsdbRead.c 141.6 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
  STimeWindow window;
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
87
  SColumnDataAgg** plist;
88
  int16_t*         colIds;  // column ids for loading file block data
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
91 92
} SBlockLoadSuppInfo;

93
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
94 95 96 97 98
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
99
  SSttBlockLoadInfo* pInfo;
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
103 104 105
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
106
  int32_t           order;
H
Hongze Cheng 已提交
107
  SLastBlockReader* pLastBlockReader;  // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
H
Hongze Cheng 已提交
119
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SDataBlk  block;  // current SDataBlk data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131
} SFileBlockDumpInfo;

132
typedef struct SUidOrderCheckInfo {
133 134
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
135 136
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
137
typedef struct SReaderStatus {
138
  bool                 loadFromFile;       // check file stage
139
  bool                 composedDataBlock;  // the returned data block is a composed block or not
140
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
141
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
142
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
143
  SFileBlockDumpInfo   fBlockDumpInfo;
144 145 146 147
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
148 149
} SReaderStatus;

150 151 152 153 154 155
typedef struct SBlockInfoBuf {
  int32_t  currentIndex;
  SArray*  pData;
  int32_t  numPerBucket;
} SBlockInfoBuf;

H
Hongze Cheng 已提交
156
struct STsdbReader {
H
Haojun Liao 已提交
157 158 159 160 161 162 163
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
164 165
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
166
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
167
  STsdbReadSnap*     pReadSnap;
168
  SIOCostSummary     cost;
169 170
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
171 172
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
173 174 175
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
176
};
H
Hongze Cheng 已提交
177

H
Haojun Liao 已提交
178
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
179 180
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
181
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
182 183
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
184
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
185
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
186
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
187
                                 STsdbReader* pReader);
188
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pInfo);
189
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
190
                                         int32_t rowIndex);
191
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
192
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange);
193

H
Hongze Cheng 已提交
194 195 196 197
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
198 199
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
200

dengyihao's avatar
dengyihao 已提交
201 202 203 204
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
205
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
206 207 208
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
209
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
210
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
H
Haojun Liao 已提交
211

212 213
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

214 215 216
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

217
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
218

219
  pSupInfo->numOfCols = numOfCols;
220
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
221
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
222 223 224
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
225 226
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
227

H
Haojun Liao 已提交
228 229
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
230
    pSupInfo->colIds[i] = pCol->info.colId;
231 232 233 234

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
235
  }
H
Hongze Cheng 已提交
236

H
Haojun Liao 已提交
237 238
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
239

240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  int32_t num =  numOfTables / pBuf->numPerBucket;
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for(int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
  for(int32_t i = 0; i < num; ++i) {
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
  char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
284
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
285
  // allocate buffer in order to load data blocks from file
286
  // todo use simple hash instead, optimize the memory consumption
287 288 289
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
290 291 292
    return NULL;
  }

H
Haojun Liao 已提交
293
  int64_t st = taosGetTimestampUs();
294
  initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
H
Haojun Liao 已提交
295

296
  for (int32_t j = 0; j < numOfTables; ++j) {
297 298 299 300 301 302 303 304 305 306 307 308 309 310
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);

#if 0
//    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
311
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
312
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
313
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
314
    } else {
H
Haojun Liao 已提交
315
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
316
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
317
    }
wmmhello's avatar
wmmhello 已提交
318

319
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
320 321 322
#endif

    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
323
              pTsdbReader->idStr);
H
Haojun Liao 已提交
324 325
  }

H
Haojun Liao 已提交
326 327 328 329
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
330

331
  return pTableMap;
H
Hongze Cheng 已提交
332
}
H
Hongze Cheng 已提交
333

334 335
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
336
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
337 338 339 340 341 342
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;

    pInfo->iterInit = false;
    pInfo->iiter.hasVal = false;
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
343 344
    }

345 346
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
347 348 349
  }
}

350 351 352
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
353

354 355 356
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
357

358 359 360
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
361

362 363 364 365
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
366

367
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
368
  void* p = NULL;
369
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
370
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
371 372 373 374 375
  }

  taosHashCleanup(pTableMap);
}

376
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
377 378
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
379
}
H
Hongze Cheng 已提交
380

381 382 383
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
384
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
385

386
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
387
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
388

dengyihao's avatar
dengyihao 已提交
389
  STimeWindow win = *pWindow;
390 391 392 393 394 395
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
396

H
Haojun Liao 已提交
397
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
398 399 400 401 402 403
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
404 405 406
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
407 408 409 410
  }
}

// init file iterator
411
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
412
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
413

414 415
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
416
  pIter->pFileList = aDFileSet;
417
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
418

419 420 421 422
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
423
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
424 425
      return code;
    }
426 427
  }

428 429 430 431 432 433 434 435
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

436
  if (pLReader->pInfo == NULL) {
437
    // here we ignore the first column, which is always be the primary timestamp column
438 439
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
440 441 442 443
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
444 445
  }

446
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
447 448 449
  return TSDB_CODE_SUCCESS;
}

450
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
451 452
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
453 454 455
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
456 457 458
    return false;
  }

H
Haojun Liao 已提交
459 460 461
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

462 463
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
464
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
465

H
Haojun Liao 已提交
466 467
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
468

469
  while (1) {
H
Haojun Liao 已提交
470 471 472
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
473

474
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
475

476 477 478 479
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
480

481 482
    pReader->cost.headFileLoad += 1;

483 484 485 486 487 488 489 490 491 492 493 494
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
495 496 497
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
498 499
      continue;
    }
C
Cary Xu 已提交
500

501
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
502
              pReader->window.ekey, pReader->idStr);
503 504
    return true;
  }
505

H
Haojun Liao 已提交
506
_err:
H
Haojun Liao 已提交
507 508 509
  return false;
}

510
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
511 512
  pIter->order = order;
  pIter->index = -1;
513
  pIter->numOfBlocks = 0;
514 515 516 517 518 519 520
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
521
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
522

H
Haojun Liao 已提交
523
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
524 525
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
526 527
}

528 529 530 531 532 533 534 535
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
536
    SColumnInfoData colInfo = {0, {0}};
537 538 539 540 541 542 543 544 545 546 547 548 549
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

550 551
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
552
  int32_t      code = 0;
553
  int8_t       level = 0;
H
Haojun Liao 已提交
554
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
555 556
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
557
    goto _end;
H
Hongze Cheng 已提交
558 559
  }

C
Cary Xu 已提交
560 561 562 563
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
564
  initReaderStatus(&pReader->status);
565

L
Liu Jicong 已提交
566
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
567 568
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
569
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
570 571
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
572
  pReader->type = pCond->type;
573
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
574
  pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
575
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
576

577
  limitOutputBufferSize(pCond, &pReader->capacity);
578

579 580
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
581
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
582
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
583
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
584 585 586
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
587

588 589
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
590
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
591 592 593 594 595
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

596 597 598 599
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
600
  }
H
Hongze Cheng 已提交
601

602 603
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
604 605
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
606

H
Haojun Liao 已提交
607
_end:
H
Haojun Liao 已提交
608
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
609 610 611
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
612

H
Haojun Liao 已提交
613
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
614
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
615

616
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
617
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
618
  if (code != TSDB_CODE_SUCCESS) {
619
    goto _end;
H
Haojun Liao 已提交
620
  }
H
Hongze Cheng 已提交
621

622 623
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
624
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
625 626
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
627

628 629 630 631
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
632
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
633

634
    // uid check
H
Hongze Cheng 已提交
635
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
636 637 638 639
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
640
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
641 642 643 644
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
645
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
646
    if (pScanInfo->pBlockList == NULL) {
647
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
648 649
    }

H
Hongze Cheng 已提交
650
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
651
  }
H
Hongze Cheng 已提交
652

653
  int64_t et2 = taosGetTimestampUs();
654
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
655
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
656 657 658

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
659
_end:
H
Hongze Cheng 已提交
660
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
661 662
  return code;
}
H
Hongze Cheng 已提交
663

664
static void cleanupTableScanInfo(SHashObj* pTableMap) {
665
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
666
  while (1) {
667
    px = taosHashIterate(pTableMap, px);
668 669 670 671
    if (px == NULL) {
      break;
    }

672
    // reset the index in last block when handing a new file
673 674
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
675
  }
676 677
}

678
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
679 680 681 682 683 684
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
685

dengyihao's avatar
dengyihao 已提交
686
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
687
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
688

H
Haojun Liao 已提交
689 690
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
691

692
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
693
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
694

695
    sizeInDisk += pScanInfo->mapData.nData;
696
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
697 698
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
699

700
      // 1. time range check
701
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
702 703
        continue;
      }
H
Hongze Cheng 已提交
704

705
      // 2. version range check
H
Hongze Cheng 已提交
706
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
707 708
        continue;
      }
709

710 711 712
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts};

H
Haojun Liao 已提交
713
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
714
      if (p == NULL) {
715
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
716 717
        return TSDB_CODE_OUT_OF_MEMORY;
      }
718

719
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
720
    }
H
Hongze Cheng 已提交
721

H
Haojun Liao 已提交
722
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
723 724 725 726
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
727
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
728
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
729

730
  double el = (taosGetTimestampUs() - st) / 1000.0;
731 732 733 734 735
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
736

737
  pReader->cost.numOfBlocks += total;
738
  pReader->cost.headFileLoadTime += el;
739

H
Haojun Liao 已提交
740 741
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
742

743
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
744
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
745
  pDumpInfo->allDumped = true;
746
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
747 748
}

749 750
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
751
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
752
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
753 754 755
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
756
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
757 758 759 760
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
761
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
762
  }
H
Haojun Liao 已提交
763 764
}

765
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
766 767
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
768 769
    return NULL;
  }
770 771 772

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
773 774
}

H
Hongze Cheng 已提交
775
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
776

H
Haojun Liao 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
850
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
851 852 853 854
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
855 856
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
857 858
    while (1) {
      // check can return
H
Hongze Cheng 已提交
859 860 861
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
862 863

      // change start or end position
H
Hongze Cheng 已提交
864
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
865 866
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
867
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
868 869 870 871
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
872
  } else {  // DESC
H
Haojun Liao 已提交
873
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
874 875
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
876 877
    while (1) {
      // check can return
H
Hongze Cheng 已提交
878 879 880
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
881 882

      // change start or end position
H
Hongze Cheng 已提交
883
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
884 885
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
886
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
887 888 889 890 891 892 893 894 895 896
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
897
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
898 899 900 901 902 903

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
904 905
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
906 907 908 909 910
  }

  return endPos;
}

911
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
912
  SReaderStatus*  pStatus = &pReader->status;
913
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
914

915
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
916
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
917
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
918
  SSDataBlock*        pResBlock = pReader->pResBlock;
919
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
920

H
Haojun Liao 已提交
921
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
922
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
923

H
Haojun Liao 已提交
924
  SColVal cv = {0};
925
  int64_t st = taosGetTimestampUs();
926 927
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
928

929 930
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
931 932 933
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
934 935
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
936 937 938
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
939
    }
H
Haojun Liao 已提交
940 941 942
  }

  // time window check
943 944 945 946 947 948 949
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
950
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
951
  if (remain > pReader->capacity) {  // output buffer check
952 953 954
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
955 956
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
957
  int32_t          i = 0;
958 959
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
960
    if (asc) {
H
Haojun Liao 已提交
961
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
962 963 964 965
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
966
    }
H
Haojun Liao 已提交
967

968 969 970
    i += 1;
  }

971 972 973
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
974 975 976
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
977
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
978 979 980
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
981
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
982 983 984 985
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
986
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
987

H
Haojun Liao 已提交
988 989
          // make sure it is aligned to 8bit
          ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
990

H
Haojun Liao 已提交
991 992 993
          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
994
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
995 996 997 998 999 1000 1001 1002 1003 1004 1005
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1006
      }
H
Haojun Liao 已提交
1007

1008
      colIndex += 1;
1009
      i += 1;
1010 1011
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
1012
      i += 1;
H
Haojun Liao 已提交
1013
    }
1014 1015
  }

1016
  // fill the mis-matched columns with null value
1017
  while (i < numOfOutputCols) {
1018 1019 1020
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
1021
  }
H
Haojun Liao 已提交
1022

1023
  pResBlock->info.rows = remain;
1024
  pDumpInfo->rowIndex += step * remain;
1025

1026
  // check if current block are all handled
1027
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1028 1029 1030 1031
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1032
  } else {
1033 1034
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1035
  }
H
Haojun Liao 已提交
1036

1037
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1038
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1039

1040
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1041
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1042
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
1043 1044
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1045 1046 1047 1048

  return TSDB_CODE_SUCCESS;
}

1049 1050
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1051 1052
  int64_t st = taosGetTimestampUs();

1053 1054
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1055 1056
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1057 1058 1059 1060
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1061
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1062
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1063
  ASSERT(pBlockInfo != NULL);
1064

H
Hongze Cheng 已提交
1065
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1066
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1067 1068
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1069
                  ", rows:%d, code:%s %s",
1070
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1071 1072 1073
              tstrerror(code), pReader->idStr);
    return code;
  }
1074

1075
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1076

1077
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
1078
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1079 1080
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1081 1082 1083

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1084

H
Haojun Liao 已提交
1085
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1086
}
H
Hongze Cheng 已提交
1087

H
Haojun Liao 已提交
1088 1089 1090
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1091

H
Haojun Liao 已提交
1092 1093 1094 1095
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1096

H
Haojun Liao 已提交
1097 1098
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1099

H
Haojun Liao 已提交
1100 1101
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1102

H
Haojun Liao 已提交
1103
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1104 1105
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1106

H
Haojun Liao 已提交
1107 1108 1109 1110
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1111

H
Haojun Liao 已提交
1112 1113
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1114

H
Haojun Liao 已提交
1115
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1116
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1117
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1118

H
Haojun Liao 已提交
1119
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1120

H
Haojun Liao 已提交
1121 1122
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1123

H
Haojun Liao 已提交
1124 1125 1126 1127 1128 1129 1130
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1131

1132
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1133
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1134

1135 1136 1137
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1138
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1139 1140
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1141
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1142
    if (pScanInfo == NULL) {
1143
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1144 1145 1146
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1147 1148
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1149
  }
1150 1151 1152 1153 1154 1155

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1156
}
H
Hongze Cheng 已提交
1157

1158
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1159
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1160

1161
  SBlockOrderSupporter sup = {0};
1162
  pBlockIter->numOfBlocks = numOfBlocks;
1163
  taosArrayClear(pBlockIter->blockList);
1164
  pBlockIter->pTableMap = pReader->status.pTableMap;
1165

1166 1167
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1168

1169
  int64_t st = taosGetTimestampUs();
1170
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1171 1172 1173
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1174

1175 1176 1177 1178 1179 1180 1181
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1182

1183
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1184 1185 1186
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1187

1188 1189
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1190

1191 1192 1193 1194 1195
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1196

1197
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1198

1199 1200 1201
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1202
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1203 1204 1205 1206 1207
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1208

1209
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1210

1211
  // since there is only one table qualified, blocks are not sorted
1212 1213
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1214 1215
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1216
    }
1217

1218
    int64_t et = taosGetTimestampUs();
1219
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1220
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1221

1222
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1223
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1224
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1225
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1226
  }
H
Haojun Liao 已提交
1227

1228 1229
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1230

1231
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1232

1233 1234 1235 1236 1237
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1238
  }
H
Haojun Liao 已提交
1239

1240 1241 1242 1243
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1244

1245 1246
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1247

1248 1249 1250 1251
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1252

1253 1254
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1255
  }
H
Haojun Liao 已提交
1256

1257
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1258 1259
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1260 1261
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1262

1263
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1264
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1265

1266
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1267
}
H
Hongze Cheng 已提交
1268

H
Haojun Liao 已提交
1269
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1270 1271
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1272
  int32_t step = asc ? 1 : -1;
1273
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1274 1275 1276
    return false;
  }

1277
  pBlockIter->index += step;
H
Haojun Liao 已提交
1278
  doSetCurrentBlock(pBlockIter, idStr);
1279

1280 1281 1282
  return true;
}

1283 1284 1285
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1286
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1287 1288
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1289 1290
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1291
}
H
Hongze Cheng 已提交
1292

1293
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1294
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1295
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1296
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1297
    return false;
1298 1299
  }

H
Haojun Liao 已提交
1300
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1301
    return false;
1302 1303
  }

1304
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1305
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1306 1307
  *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
//  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1308
  return true;
1309 1310 1311 1312 1313
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1314
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1315 1316
  int32_t index = pBlockIter->index;

1317
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1330
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1331
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1332 1333 1334 1335
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1336 1337 1338 1339 1340
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1341

1342 1343 1344
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1345

H
Haojun Liao 已提交
1346
  doSetCurrentBlock(pBlockIter, "");
1347 1348 1349
  return TSDB_CODE_SUCCESS;
}

1350
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1351 1352
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1353
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1354
  } else {
1355
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1356
  }
H
Haojun Liao 已提交
1357
}
H
Hongze Cheng 已提交
1358

H
Hongze Cheng 已提交
1359
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1360
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1361

1362
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1363
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1364
}
H
Hongze Cheng 已提交
1365

H
Hongze Cheng 已提交
1366
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1367 1368
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1369 1370
}

1371
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t startIndex) {
1372 1373
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1374
  for (int32_t i = startIndex; i < num; i += 1) {
1375 1376
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1377
      if (p->version >= pBlock->minVer) {
1378 1379 1380
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1381
      if (p->version >= pBlock->minVer) {
1382 1383
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1384 1385
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1399
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1400 1401 1402 1403
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1404
  // ts is not overlap
1405
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1406
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1407 1408 1409 1410 1411
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1412
  if (ASCENDING_TRAVERSE(order)) {
1413
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1414 1415
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1416
    while (1) {
1417 1418 1419 1420 1421
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1422 1423 1424
      }
    }

1425
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1426
  }
1427 1428
}

H
Haojun Liao 已提交
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1442 1443
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1444

1445
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1446

1447
  // overlap with neighbor
1448
  if (hasNeighbor) {
1449
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1450 1451
  }

1452
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1453 1454
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1455

H
Haojun Liao 已提交
1456 1457 1458
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1459 1460
  }

H
Haojun Liao 已提交
1461 1462 1463 1464
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1465

H
Haojun Liao 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1480 1481 1482 1483

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1484 1485
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1486 1487 1488
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1489 1490 1491
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1492 1493
}

H
Haojun Liao 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1503
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1504
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1505 1506
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1507

1508 1509 1510
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1511
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1512

1513
  blockDataUpdateTsWindow(pBlock, 0);
1514
  pBlock->info.uid = pBlockScanInfo->uid;
1515

1516
  setComposedBlockFlag(pReader, true);
1517

1518
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1519
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1520
                " - %" PRId64 " %s",
1521 1522
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1523 1524

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1525 1526 1527
  return code;
}

1528 1529
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1530 1531 1532
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1533 1534
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1535
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1536 1537

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1538
    if (nextKey != key) {  // merge is not needed
1539
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1540 1541 1542 1543 1544 1545 1546 1547
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1548 1549
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1550 1551 1552 1553 1554 1555 1556 1557
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1558 1559
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1560 1561 1562 1563 1564 1565 1566
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1567
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1582 1583 1584
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1585
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1586 1587
  }

1588
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1589 1590 1591 1592 1593 1594
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1595 1596 1597 1598 1599 1600
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1601 1602 1603 1604 1605 1606
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1607
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1608
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1609
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1610 1611
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1612 1613
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1614
  }
H
Haojun Liao 已提交
1615 1616
}

1617
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1618 1619 1620 1621 1622 1623
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1624
  int64_t tsLast = INT64_MIN;
1625
  if (hasDataInLastBlock(pLastBlockReader)) {
1626 1627
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1628

H
Hongze Cheng 已提交
1629 1630
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1631

1632 1633
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1634
    minKey = INT64_MAX;  // chosen the minimum value
1635
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1636 1637
      minKey = tsLast;
    }
1638

1639 1640 1641
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1642

1643
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1644 1645 1646 1647
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1648
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1649 1650 1651 1652 1653 1654 1655
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1656
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1657 1658
      minKey = key;
    }
1659 1660 1661 1662
  }

  bool init = false;

1663
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1664
  // DESC: mem -----> imem -----> last block -----> file block
1665 1666
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1667
      init = true;
H
Haojun Liao 已提交
1668 1669 1670 1671
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1672
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1673 1674
    }

1675
    if (minKey == tsLast) {
1676
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1677 1678 1679
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1680
        init = true;
H
Haojun Liao 已提交
1681 1682 1683 1684
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1685
      }
1686
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1687
    }
1688

1689
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1690 1691 1692
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1693 1694
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1695 1696 1697 1698 1699 1700 1701 1702
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1703 1704 1705 1706 1707
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1708
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1709 1710 1711 1712 1713 1714
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1715
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1716 1717
        return code;
      }
1718 1719
    }

1720
    if (minKey == tsLast) {
1721
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1722 1723 1724
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1725
        init = true;
H
Haojun Liao 已提交
1726 1727 1728 1729
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1730
      }
1731
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1732 1733 1734
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1735 1736 1737
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1738
        init = true;
H
Haojun Liao 已提交
1739 1740 1741 1742
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1743 1744 1745
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1746 1747
  }

1748 1749 1750 1751 1752
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1753
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1754 1755 1756 1757 1758 1759

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1760 1761 1762
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1763
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1764
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1765 1766 1767

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1768
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1769
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1770

1771 1772 1773
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1774
      pBlockScanInfo->lastKey = tsLastBlock;
1775 1776
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1777 1778 1779 1780
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1781 1782 1783

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1784
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1785

H
Haojun Liao 已提交
1786
      code = tRowMergerGetRow(&merge, &pTSRow);
1787 1788 1789
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1790

1791
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1792 1793 1794 1795 1796

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1797 1798 1799 1800 1801
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1802
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1803
    ASSERT(mergeBlockData);
1804 1805

    // merge with block data if ts == key
H
Haojun Liao 已提交
1806
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1807 1808 1809
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1810
    code = tRowMergerGetRow(&merge, &pTSRow);
1811 1812 1813 1814
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1815
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1816 1817 1818 1819

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1820 1821 1822 1823

  return TSDB_CODE_SUCCESS;
}

1824 1825
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1826 1827
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1828
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1829
    // no last block available, only data block exists
1830
    if (!hasDataInLastBlock(pLastBlockReader)) {
1831 1832 1833 1834 1835 1836 1837 1838 1839
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1840
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1841 1842 1843 1844
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1845

H
Haojun Liao 已提交
1846 1847 1848 1849 1850
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1851
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1852 1853 1854 1855

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1856
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1857

H
Haojun Liao 已提交
1858
        code = tRowMergerGetRow(&merge, &pTSRow);
1859 1860 1861 1862
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1863
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1864

1865 1866
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1867
        return code;
1868
      } else {
1869 1870
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1871
      }
1872
    } else {  // desc order
1873
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1874
    }
1875
  } else {  // only last block exists
1876
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1877
  }
1878 1879
}

1880 1881
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1882 1883 1884
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1885 1886 1887
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1888 1889
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1890 1891
  ASSERT(pRow != NULL && piRow != NULL);

1892
  int64_t tsLast = INT64_MIN;
1893 1894 1895
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1896

H
Hongze Cheng 已提交
1897
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1898 1899 1900 1901

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1902
  int64_t minKey = 0;
1903 1904 1905 1906 1907
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1908

1909 1910 1911
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1912

1913
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1914 1915
      minKey = key;
    }
1916

1917
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1918 1919 1920
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1921
    minKey = INT64_MIN;  // let find the maximum ts value
1922 1923 1924 1925 1926 1927 1928 1929
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1930
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1931 1932 1933
      minKey = key;
    }

1934
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1935 1936
      minKey = tsLast;
    }
1937 1938 1939 1940
  }

  bool init = false;

1941 1942 1943 1944
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1945
      init = true;
1946
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1947
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1948 1949 1950 1951
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1952
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1953 1954
    }

1955
    if (minKey == tsLast) {
1956
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1957 1958 1959
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1960
        init = true;
1961
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1962 1963 1964
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1965
      }
H
Haojun Liao 已提交
1966

1967
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1968 1969 1970
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1971 1972 1973
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1974 1975
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1976 1977 1978 1979
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1980
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1981 1982 1983
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1984
      }
H
Haojun Liao 已提交
1985

1986 1987
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1988 1989 1990
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1991 1992
    }

1993
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1994
      if (init) {
1995 1996 1997 1998
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1999 2000
        tRowMerge(&merge, pRow);
      } else {
2001
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2002
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2003 2004 2005
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2006
      }
H
Haojun Liao 已提交
2007 2008 2009 2010 2011
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2012 2013 2014 2015
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2016
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2017
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2018 2019 2020 2021
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2022 2023 2024 2025 2026
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2027 2028 2029
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2030 2031 2032
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2033 2034
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2035
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2036 2037 2038
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2039
      }
H
Haojun Liao 已提交
2040 2041 2042 2043 2044
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2045 2046 2047
    }

    if (minKey == tsLast) {
2048
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2049 2050 2051
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2052
        init = true;
2053
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2054 2055 2056
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2057
      }
2058
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2059 2060 2061
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2062
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2063
      if (!init) {
2064
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2065 2066 2067
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2068
      } else {
2069 2070 2071
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2072
        tRowMerge(&merge, &fRow);
2073 2074
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2075 2076 2077
    }
  }

2078
  if (merge.pTSchema == NULL) {
2079 2080 2081
    return code;
  }

H
Haojun Liao 已提交
2082
  code = tRowMergerGetRow(&merge, &pTSRow);
2083 2084 2085 2086
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2087
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2088 2089 2090

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2091
  return code;
2092 2093
}

2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2119
                  "-%" PRId64 " %s",
2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2140
                  "-%" PRId64 " %s",
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2158 2159
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2160 2161 2162 2163 2164 2165 2166 2167
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2179
  TSDBKEY k = {.ts = ts, .version = ver};
2180 2181
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2182 2183 2184
    return false;
  }

2185 2186 2187
  return true;
}

2188
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2189
  // the last block reader has been initialized for this table.
2190
  if (pLBlockReader->uid == pScanInfo->uid) {
2191
    return hasDataInLastBlock(pLBlockReader);
2192 2193
  }

2194 2195
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2196 2197
  }

2198 2199
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2200

H
Hongze Cheng 已提交
2201
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2202 2203 2204
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2205
  } else {
2206
    w.ekey = pScanInfo->lastKey + step;
2207 2208
  }

2209 2210 2211
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2212 2213 2214 2215
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2216
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2217 2218
}

2219
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2220
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2221
  return TSDBROW_TS(&row);
2222 2223
}

H
Hongze Cheng 已提交
2224
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2225 2226 2227 2228

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2229 2230
  }

2231
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2232
}
2233

2234 2235
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2236 2237
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2238
    pBlockScanInfo->lastKey = key;
2239 2240
    return TSDB_CODE_SUCCESS;
  } else {
2241 2242
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2243 2244 2245
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2246 2247 2248 2249 2250
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2251
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2252
    code = tRowMergerGetRow(&merge, &pTSRow);
2253 2254 2255 2256
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2257
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2258 2259 2260 2261 2262 2263 2264

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2265 2266
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2267 2268
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2269
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2270
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2271
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2272
  } else {
2273 2274 2275 2276 2277 2278 2279 2280 2281
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2282
    // imem + file + last block
2283
    if (pBlockScanInfo->iiter.hasVal) {
2284
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2285 2286
    }

2287
    // mem + file + last block
2288
    if (pBlockScanInfo->iter.hasVal) {
2289
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2290
    }
2291

2292 2293
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2294 2295 2296
  }
}

2297
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2298 2299
  int32_t code = TSDB_CODE_SUCCESS;

2300 2301
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2302
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2303 2304
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2305
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2306
  int64_t st = taosGetTimestampUs();
2307
  int32_t step = asc ? 1 : -1;
2308 2309 2310

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2311 2312
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2313
      code = TSDB_CODE_INVALID_PARA;
2314 2315
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2316 2317 2318
      goto _end;
    }

2319 2320
    pBlockScanInfo = *(STableBlockScanInfo**) p;

H
Haojun Liao 已提交
2321
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2322
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2323 2324 2325

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
2326
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2327
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2328 2329

        // record the last key value
2330
        pBlockScanInfo->lastKey = asc? pBlock->maxKey.ts:pBlock->minKey.ts;
H
Haojun Liao 已提交
2331 2332
        goto _end;
      }
H
Haojun Liao 已提交
2333 2334
    }
  } else {  // file blocks not exist
2335
    pBlockScanInfo = *pReader->status.pTableIter;
2336 2337
  }

2338
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2339
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2340

2341
  while (1) {
2342
    bool hasBlockData = false;
2343
    {
H
Haojun Liao 已提交
2344
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2345 2346 2347 2348 2349
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2350 2351
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2352
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2353
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2354
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2355 2356 2357
          break;
        }
      }
2358
    }
2359

2360
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2361

2362 2363 2364
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2365 2366
    }

2367
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2368

2369
    // currently loaded file data block is consumed
2370
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2371
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2372
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2373 2374 2375 2376 2377
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2378 2379 2380
    }
  }

H
Haojun Liao 已提交
2381
_end:
2382
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2383 2384
  blockDataUpdateTsWindow(pResBlock, 0);

2385
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2386
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2387 2388 2389

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2390

2391 2392
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2393
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2394
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2395
              pResBlock->info.rows, el, pReader->idStr);
2396
  }
2397

H
Haojun Liao 已提交
2398
  return code;
2399 2400 2401 2402
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2403 2404
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2405 2406 2407
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2408

2409 2410 2411
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2412 2413
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2414
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2415 2416
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2417
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2418
    if (code != TSDB_CODE_SUCCESS) {
2419 2420 2421 2422 2423
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2424
      tsdbDelFReaderClose(&pDelFReader);
2425 2426 2427
      goto _err;
    }

H
Hongze Cheng 已提交
2428
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2429 2430 2431
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2432 2433
      goto _err;
    }
2434

2435 2436 2437
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2438
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2439
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2440 2441 2442 2443 2444 2445 2446
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2447
    }
2448
  }
2449

2450 2451 2452 2453 2454 2455 2456
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2457 2458
  }

2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2473
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2474
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2475 2476
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2477
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2478 2479
  return code;

H
Haojun Liao 已提交
2480
_err:
2481 2482
  taosArrayDestroy(pDelData);
  return code;
2483 2484
}

H
Haojun Liao 已提交
2485
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2486
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2487
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2488
  if (pRow != NULL) {
2489 2490 2491
    key = TSDBROW_KEY(pRow);
  }

2492
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2493
  if (pRow != NULL) {
2494 2495 2496 2497 2498 2499 2500 2501 2502
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2503
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2504
  SReaderStatus* pStatus = &pReader->status;
2505
  pBlockNum->numOfBlocks = 0;
2506
  pBlockNum->numOfLastFiles = 0;
2507

2508
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2509
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2510 2511

  while (1) {
2512
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2513
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2514 2515 2516
      break;
    }

H
Haojun Liao 已提交
2517
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2518 2519
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2520
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2521 2522 2523
      return code;
    }

H
Hongze Cheng 已提交
2524
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2525
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2526
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2527
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2528 2529 2530
        return code;
      }

2531
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2532 2533 2534
        break;
      }
    }
2535

H
Haojun Liao 已提交
2536 2537 2538
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2539
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2540 2541 2542
  return TSDB_CODE_SUCCESS;
}

2543
static int32_t uidComparFunc(const void* p1, const void* p2) {
2544 2545
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2546 2547 2548
  if (pu1 == pu2) {
    return 0;
  } else {
2549
    return (pu1 < pu2) ? -1 : 1;
2550 2551
  }
}
2552

2553
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2554 2555 2556 2557
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2558
  while (p != NULL) {
2559
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
2560 2561 2562 2563 2564 2565 2566
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2567
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2568 2569 2570 2571
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2572

2573
  if (pOrderCheckInfo->tableUidList == NULL) {
2574 2575 2576 2577 2578 2579
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2580
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2581 2582 2583
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2584 2585
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2586 2587
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2588 2589

      // the tableMap has already updated
2590
      if (pStatus->pTableIter == NULL) {
2591
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2592 2593 2594 2595 2596 2597 2598 2599 2600
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2601
      }
2602
    }
2603
  }
2604

2605 2606 2607
  return TSDB_CODE_SUCCESS;
}

2608
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2621
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2622
  SReaderStatus*    pStatus = &pReader->status;
2623 2624
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2625 2626
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2627
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2628 2629
    return code;
  }
2630

2631
  while (1) {
2632
    // load the last data block of current table
2633
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
H
Hongze Cheng 已提交
2634
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2635
    if (!hasVal) {
2636 2637
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2638 2639 2640
        return TSDB_CODE_SUCCESS;
      }
      continue;
2641 2642
    }

2643 2644 2645 2646 2647 2648 2649 2650
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2651

2652
    // current table is exhausted, let's try next table
2653 2654
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2655 2656
      return TSDB_CODE_SUCCESS;
    }
2657 2658 2659
  }
}

2660
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2661 2662
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2663 2664 2665

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2666 2667 2668
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2669

2670
  if (pBlockInfo != NULL) {
2671
    pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2672
  } else {
2673
    pScanInfo = *pReader->status.pTableIter;
2674 2675
  }

H
Haojun Liao 已提交
2676
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2677
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2678 2679 2680 2681
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2682
  if (pBlockInfo != NULL) {
2683
    pBlock = getCurrentBlock(pBlockIter);
2684 2685
  }

2686
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2687
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2688

2689 2690 2691
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2692
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2693
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2694 2695
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2696 2697 2698
    }

    // build composed data block
2699
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2700
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2701
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2702
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2703
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2704
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2705 2706 2707 2708
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2709
      ASSERT(tsLast >= pBlock->maxKey.ts);
2710 2711
      tBlockDataReset(&pReader->status.fileBlockData);

2712
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2713
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2714
    } else {  // whole block is required, return it directly
2715 2716 2717 2718 2719 2720
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2721

2722 2723
      // update the last key for the corresponding table
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pInfo->window.ekey:pInfo->window.skey;
2724
    }
2725 2726 2727 2728 2729
  }

  return code;
}

H
Haojun Liao 已提交
2730
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2731 2732
  SReaderStatus* pStatus = &pReader->status;

2733
  while (1) {
2734 2735 2736
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2737
        return TSDB_CODE_SUCCESS;
2738 2739 2740
      }
    }

2741 2742
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2743

2744
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2745
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2746 2747 2748 2749
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2750
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2751
      return TSDB_CODE_SUCCESS;
2752 2753 2754 2755 2756
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2757
      return TSDB_CODE_SUCCESS;
2758 2759 2760 2761
    }
  }
}

2762
// set the correct start position in case of the first/last file block, according to the query time window
2763
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2764
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2765

2766 2767 2768
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2769 2770 2771

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2772
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2773 2774
}

2775
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2776 2777
  SBlockNumber num = {0};

2778
  int32_t code = moveToNextFile(pReader, &num);
2779 2780 2781 2782 2783
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2784
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2785 2786 2787 2788 2789
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2790 2791
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2792
  } else {  // no block data, only last block exists
2793
    tBlockDataReset(&pReader->status.fileBlockData);
2794
    resetDataBlockIterator(pBlockIter, pReader->order);
2795
  }
2796 2797

  // set the correct start position according to the query time window
2798
  initBlockDumpInfo(pReader, pBlockIter);
2799 2800 2801
  return code;
}

2802
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2803 2804
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2805 2806
}

2807
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2808
  int32_t code = TSDB_CODE_SUCCESS;
2809 2810
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2811 2812
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2813
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2814
  _begin:
2815 2816 2817 2818 2819
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2820 2821 2822 2823
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2824
    // all data blocks are checked in this last block file, now let's try the next file
2825 2826 2827 2828 2829 2830 2831 2832
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2833
      // this file does not have data files, let's start check the last block file if exists
2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2849
  while (1) {
2850 2851
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2852
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2853
      code = buildComposedDataBlock(pReader);
2854 2855 2856 2857
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2858
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2859 2860
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2861
        } else {
H
Haojun Liao 已提交
2862
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2863 2864 2865 2866 2867 2868
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2869

2870 2871 2872 2873
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2874

2875 2876 2877 2878
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2879
          }
2880
        }
H
Haojun Liao 已提交
2881
      }
2882 2883

      code = doBuildDataBlock(pReader);
2884 2885
    }

2886 2887 2888 2889 2890 2891 2892 2893
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2894
}
H
refact  
Hongze Cheng 已提交
2895

2896 2897
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2898
  if (VND_IS_RSMA(pVnode)) {
2899
    int8_t  level = 0;
2900 2901
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2902
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2903 2904
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2905

2906
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2907 2908 2909 2910 2911 2912 2913
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2914
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2915 2916 2917 2918 2919
        break;
      }
      ++level;
    }

2920
    const char* str = (idStr != NULL) ? idStr : "";
2921 2922

    if (level == TSDB_RETENTION_L0) {
2923
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2924
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2925 2926
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2927
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2928
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2929 2930
      return VND_RSMA1(pVnode);
    } else {
2931
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2932
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2933 2934 2935 2936 2937 2938 2939
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2940
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2941
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2942 2943

  int64_t endVer = 0;
L
Liu Jicong 已提交
2944 2945
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2946 2947
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2948
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2949 2950
  }

H
Haojun Liao 已提交
2951
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2952 2953
}

2954
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2955 2956 2957 2958
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2959 2960 2961
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2962

2963 2964 2965 2966 2967 2968
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2969
        return false;
2970 2971 2972
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2973 2974
      }
    } else {
2975 2976 2977 2978 2979 2980 2981
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2982 2983
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2999 3000
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3001 3002 3003 3004 3005 3006
            return true;
          }
        }
      }

      return false;
3007 3008
    }
  } else {
3009 3010
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3011

3012 3013 3014 3015 3016 3017 3018
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3019
    } else {
3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3047 3048 3049 3050 3051
      }

      return false;
    }
  }
3052 3053

  return false;
3054 3055
}

3056
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3057
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3058 3059
    return NULL;
  }
H
Hongze Cheng 已提交
3060

3061
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3062
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3063
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3064
    pIter->hasVal = false;
H
Haojun Liao 已提交
3065 3066
    return NULL;
  }
H
Hongze Cheng 已提交
3067

3068
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3069
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3070
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3071 3072
    return pRow;
  }
H
Hongze Cheng 已提交
3073

3074
  while (1) {
3075 3076
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3077 3078
      return NULL;
    }
H
Hongze Cheng 已提交
3079

3080
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3081

H
Haojun Liao 已提交
3082
    key = TSDBROW_KEY(pRow);
3083
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3084
      pIter->hasVal = false;
H
Haojun Liao 已提交
3085 3086
      return NULL;
    }
H
Hongze Cheng 已提交
3087

dengyihao's avatar
dengyihao 已提交
3088
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3089
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3090 3091 3092 3093
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3094

3095 3096
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3097
  while (1) {
3098 3099
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3100 3101
      break;
    }
H
Hongze Cheng 已提交
3102

3103
    // data exists but not valid
3104
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3105 3106 3107 3108 3109
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3110
    TSDBKEY k = TSDBROW_KEY(pRow);
3111
    if (k.ts != ts) {
H
Haojun Liao 已提交
3112 3113 3114
      break;
    }

H
Haojun Liao 已提交
3115
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3116 3117 3118 3119
    if (pTSchema == NULL) {
      return terrno;
    }

3120
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3121 3122 3123 3124 3125
  }

  return TSDB_CODE_SUCCESS;
}

3126
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3127
                                          SVersionRange* pVerRange, int32_t step) {
3128 3129
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3130
      rowIndex += step;
3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3147
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3148 3149
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3150
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3151
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3152

3153
  *state = CHECK_FILEBLOCK_QUIT;
3154
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3155

3156 3157 3158 3159
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3160
  if (!hasNeighbor) {  // do nothing
3161 3162 3163
    return 0;
  }

3164
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3165
  if (overlap) {  // load next block
3166
    SReaderStatus*  pStatus = &pReader->status;
3167 3168
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3169
    // 1. find the next neighbor block in the scan block list
3170
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3171
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3172

3173
    // 2. remove it from the scan block list
3174
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3175

3176
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3177
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3178 3179 3180 3181
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3182
    // 4. check the data values
3183 3184 3185 3186
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3187
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3188 3189 3190 3191 3192 3193 3194
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3195 3196
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3197 3198
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3199
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3200
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3201
  int32_t step = asc ? 1 : -1;
3202

3203
  pDumpInfo->rowIndex += step;
3204
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3205 3206 3207
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3208

3209 3210 3211 3212
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3213

3214
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3215
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3216 3217 3218
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3219
      }
3220
    }
H
Haojun Liao 已提交
3221
  }
3222

H
Haojun Liao 已提交
3223 3224 3225
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3226
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3227 3228
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3229 3230
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3231
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3232 3233 3234 3235 3236 3237 3238 3239 3240
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3241 3242
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3243
  TSDBROW* pNextRow = NULL;
3244
  TSDBROW  current = *pRow;
3245

3246 3247
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3248

3249 3250 3251
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3252
      return TSDB_CODE_SUCCESS;
3253
    } else {  // has next point in mem/imem
3254
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3255 3256 3257
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3258
        return TSDB_CODE_SUCCESS;
3259 3260
      }

H
Haojun Liao 已提交
3261
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3262 3263
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3264
        return TSDB_CODE_SUCCESS;
3265
      }
3266
    }
3267 3268
  }

3269 3270
  SRowMerger merge = {0};

3271
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3272
  terrno = 0;
H
Haojun Liao 已提交
3273
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3274 3275 3276
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3277

3278 3279
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3280
  }
H
Haojun Liao 已提交
3281

H
Haojun Liao 已提交
3282 3283 3284 3285
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3286 3287

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3288
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3289 3290 3291
    return terrno;
  }

H
Haojun Liao 已提交
3292 3293
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3294 3295 3296 3297 3298
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3299
  code = tRowMergerGetRow(&merge, pTSRow);
3300 3301 3302
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3303

3304
  tRowMergerClear(&merge);
3305
  *freeTSRow = true;
3306
  return TSDB_CODE_SUCCESS;
3307 3308
}

3309
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3310
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3311 3312
  SRowMerger merge = {0};

3313 3314 3315
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3316
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3317
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3318

H
Haojun Liao 已提交
3319 3320 3321 3322 3323 3324 3325 3326 3327 3328
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3329

3330
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3331 3332 3333 3334 3335 3336
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3337
  } else {
H
Haojun Liao 已提交
3338
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3339

H
Haojun Liao 已提交
3340
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3341
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3342 3343 3344 3345 3346 3347 3348 3349
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3350 3351

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3352 3353 3354 3355 3356
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3357
  }
3358

3359 3360
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3361 3362
}

3363 3364
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3365 3366
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3367
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3368
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3369

3370 3371
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3372
  if (pBlockScanInfo->iter.hasVal) {
3373 3374 3375 3376 3377 3378
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3379
  if (pBlockScanInfo->iiter.hasVal) {
3380 3381 3382 3383 3384 3385
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3386
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3387
    TSDBKEY k = TSDBROW_KEY(pRow);
3388
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3389

3390
    int32_t code = TSDB_CODE_SUCCESS;
3391 3392
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3393
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3394
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3395
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3396
      }
3397
    } else {  // ik.ts == k.ts
3398
      *freeTSRow = true;
3399 3400 3401 3402
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3403
    }
3404

3405
    return code;
H
Haojun Liao 已提交
3406 3407
  }

3408
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3409 3410
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3411 3412
  }

3413
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3414
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3415 3416 3417 3418 3419
  }

  return TSDB_CODE_SUCCESS;
}

3420
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pScanInfo) {
3421 3422
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
3423
  int64_t uid = pScanInfo->uid;
3424

3425
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3426
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3427

3428
  SColVal colVal = {0};
3429
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3430

3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3442
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3443 3444 3445 3446 3447 3448 3449 3450
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3451
    }
3452 3453
  }

3454
  // set null value since current column does not exist in the "pSchema"
3455
  while (i < numOfCols) {
3456 3457 3458 3459 3460
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3461
  pBlock->info.rows += 1;
3462
  pScanInfo->lastKey = pTSRow->ts;
3463 3464 3465
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3466 3467
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3468 3469 3470 3471 3472 3473 3474 3475
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3476
    i += 1;
3477 3478 3479
  }

  SColVal cv = {0};
3480 3481
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3482

3483
  while (i < numOfOutputCols && j < numOfInputCols) {
3484
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3485
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3486

3487 3488 3489 3490 3491
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3492
    if (pData->cid == pCol->info.colId) {
3493 3494
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3495
      j += 1;
H
Haojun Liao 已提交
3496 3497
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3498 3499 3500 3501 3502 3503 3504 3505
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3506
    colDataAppendNULL(pCol, outputRowIndex);
3507 3508 3509 3510 3511 3512 3513
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3514 3515
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3516 3517 3518 3519
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3520
    bool    freeTSRow = false;
3521
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3522 3523
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3524 3525
    }

3526 3527
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3528 3529 3530
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3531 3532

    // no data in buffer, return immediately
3533
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3534 3535 3536
      break;
    }

3537
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3538 3539 3540 3541
      break;
    }
  } while (1);

3542
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3543 3544
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3545

3546 3547
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3548
  ASSERT(pReader != NULL);
3549
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3550

3551
  STableBlockScanInfo** p = NULL;
3552
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3553
    clearBlockScanInfo(*p);
3554 3555
  }

3556 3557 3558
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3559 3560
  taosHashClear(pReader->status.pTableMap);

3561 3562
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
3563 3564 3565
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3566 3567
  }

H
Hongze Cheng 已提交
3568 3569 3570
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3571 3572 3573 3574 3575 3576
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3577

dengyihao's avatar
dengyihao 已提交
3578 3579 3580 3581 3582 3583
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3584

H
Hongze Cheng 已提交
3585
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3586

3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3602
// ====================================== EXPOSED APIs ======================================
3603 3604
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3605 3606 3607 3608 3609 3610
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3611 3612
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3613 3614
    goto _err;
  }
H
Hongze Cheng 已提交
3615

3616
  // check for query time window
H
Haojun Liao 已提交
3617
  STsdbReader* pReader = *ppReader;
3618
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3619 3620 3621
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3622

3623 3624
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3625
    int32_t order = pCond->order;
3626
    if (order == TSDB_ORDER_ASC) {
3627
      pCond->twindows.ekey = window.skey;
3628 3629 3630
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3631
      pCond->twindows.skey = window.ekey;
3632 3633 3634 3635
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3636
    // here we only need one more row, so the capacity is set to be ONE.
3637 3638 3639 3640 3641 3642
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3643
      pCond->twindows.skey = window.ekey;
3644
      pCond->twindows.ekey = INT64_MAX;
3645
    } else {
3646
      pCond->twindows.skey = INT64_MIN;
3647
      pCond->twindows.ekey = window.ekey;
3648
    }
3649 3650
    pCond->order = order;

3651 3652 3653 3654 3655 3656
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3657
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3658
  if (pCond->suid != 0) {
3659
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3660
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3661
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3662
    }
3663 3664
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3665
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3666
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3667
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3668
    }
3669 3670
  }

H
Hongze Cheng 已提交
3671
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3672

3673
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3674 3675 3676
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3677

H
Haojun Liao 已提交
3678 3679 3680
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3681

3682
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3683
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3684 3685 3686
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3687

3688
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3689 3690 3691
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3692
      }
3693
    } else {
H
Haojun Liao 已提交
3694 3695
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3696

H
Haojun Liao 已提交
3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3709

H
Haojun Liao 已提交
3710
      code = doOpenReaderImpl(pPrevReader);
3711
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3712
        return code;
3713
      }
3714 3715 3716
    }
  }

3717
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3718
  return code;
H
Hongze Cheng 已提交
3719

3720
  _err:
H
Haojun Liao 已提交
3721
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3722
  return code;
H
refact  
Hongze Cheng 已提交
3723 3724 3725
}

void tsdbReaderClose(STsdbReader* pReader) {
3726 3727
  if (pReader == NULL) {
    return;
3728
  }
H
refact  
Hongze Cheng 已提交
3729

3730 3731
  {
    if (pReader->innerReader[0] != NULL) {
3732
      STsdbReader* p = pReader->innerReader[0];
3733

3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3745 3746 3747 3748 3749 3750

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3751
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3752

3753 3754 3755 3756
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3757
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3758 3759 3760 3761
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3762

3763
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3764
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3765 3766

  cleanupDataBlockIterator(&pReader->status.blockIter);
3767 3768

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3769
  destroyAllBlockScanInfo(pReader->status.pTableMap);
3770
  blockDataDestroy(pReader->pResBlock);
3771
  clearBlockScanInfoBuf(&pReader->blockInfoBuf);
3772

H
Haojun Liao 已提交
3773 3774 3775
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3776

H
Haojun Liao 已提交
3777
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3778

3779
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3780
  SIOCostSummary* pCost = &pReader->cost;
3781

H
Haojun Liao 已提交
3782 3783
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3784 3785
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3786

H
Haojun Liao 已提交
3787 3788 3789 3790 3791
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3792

H
Haojun Liao 已提交
3793 3794 3795 3796 3797 3798 3799 3800 3801 3802
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3803

3804 3805
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3806 3807 3808
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3809
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3810 3811
}

3812
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3813
  // cleanup the data that belongs to the previous data block
3814 3815
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3816

3817
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3818

3819 3820 3821 3822 3823
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3824

3825 3826 3827
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3828
      buildBlockFromBufferSequentially(pReader);
3829
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3830
    }
3831 3832 3833
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3834
  }
3835

3836
  return false;
H
refact  
Hongze Cheng 已提交
3837 3838
}

3839 3840 3841 3842 3843
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3844
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3845
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3846
    pReader->step = EXTERNAL_ROWS_PREV;
3847 3848 3849
    if (ret) {
      return ret;
    }
3850
  }
3851

3852
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3853 3854
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3855
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3856 3857 3858 3859 3860

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3861
    pReader->step = EXTERNAL_ROWS_MAIN;
3862 3863 3864 3865 3866 3867 3868
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3869
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3870 3871
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3872
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3873 3874 3875 3876
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3877
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3878
    pReader->step = EXTERNAL_ROWS_NEXT;
3879 3880 3881 3882 3883 3884 3885 3886
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3887
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
3888
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3889
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3890 3891 3892
    return false;
  }

H
Haojun Liao 已提交
3893
  return true;
3894 3895
}

H
Haojun Liao 已提交
3896 3897 3898 3899 3900
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3901 3902
}

H
Haojun Liao 已提交
3903
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3904
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3905
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3906
      setBlockInfo(pReader, rows, uid, pWindow);
3907
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3908
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3909
    } else {
H
Haojun Liao 已提交
3910
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3911 3912
    }
  } else {
H
Haojun Liao 已提交
3913
    setBlockInfo(pReader, rows, uid, pWindow);
3914 3915 3916
  }
}

3917
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3918
  int32_t code = 0;
3919
  *allHave = false;
H
Hongze Cheng 已提交
3920

3921
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3922 3923 3924 3925
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3926
  // there is no statistics data for composed block
3927 3928 3929 3930
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3931

3932
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3933

H
Hongze Cheng 已提交
3934
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
3935
//  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3936

3937 3938
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3939
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3940
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3941
    if (code != TSDB_CODE_SUCCESS) {
3942 3943
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3944 3945
      return code;
    }
3946 3947 3948
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3949
  }
H
Hongze Cheng 已提交
3950

3951
  *allHave = true;
H
Hongze Cheng 已提交
3952

3953 3954
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3955

3956 3957
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3958 3959 3960 3961 3962 3963 3964 3965
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
3966 3967 3968
  size_t size = taosArrayGetSize(pSup->pColAgg);

  while (j < numOfCols && i < size) {
3969 3970 3971 3972 3973 3974 3975
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3976 3977
      i += 1;
      j += 1;
3978 3979 3980 3981 3982 3983 3984
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3985
  pReader->cost.smaDataLoad += 1;
3986 3987
  *pBlockStatis = pSup->plist;

3988
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
3989

H
Hongze Cheng 已提交
3990
  return code;
H
Hongze Cheng 已提交
3991 3992
}

3993
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3994 3995 3996
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3997
    return pReader->pResBlock->pDataBlock;
3998
  }
3999

H
Haojun Liao 已提交
4000
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
4001
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4002 4003 4004 4005 4006 4007
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4008

4009
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4010
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4011
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4012 4013
    terrno = code;
    return NULL;
4014
  }
4015 4016 4017

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4018 4019
}

4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4032
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4033
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4034 4035
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4036

4037 4038
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4039
  pReader->order = pCond->order;
4040
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4041
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4042
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4043
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4044

4045
  // allocate buffer in order to load data blocks from file
4046
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4047 4048
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4049
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4050
  tsdbDataFReaderClose(&pReader->pFileReader);
4051

4052
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4053

4054
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4055
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4056

H
Hongze Cheng 已提交
4057
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4058
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4059

4060
  int32_t code = 0;
4061

4062 4063 4064 4065 4066 4067
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4068 4069
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4070 4071 4072
      return code;
    }
  }
H
Hongze Cheng 已提交
4073

H
Hongze Cheng 已提交
4074
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
4075
                " in query %s",
H
Hongze Cheng 已提交
4076 4077
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4078

4079
  return code;
H
Hongze Cheng 已提交
4080
}
H
Hongze Cheng 已提交
4081

4082 4083 4084
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4085

4086 4087 4088 4089
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4090

4091 4092
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4093

4094 4095 4096
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4097

4098
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4099

4100
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4101

4102 4103
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4104

4105 4106
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4107

4108 4109
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4110
  }
H
Hongze Cheng 已提交
4111

4112
  pTableBlockInfo->numOfTables = numOfTables;
4113
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4114

4115 4116
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4117
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4118

4119 4120
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4121

4122 4123 4124
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4125

4126 4127 4128
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4129

4130 4131 4132
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4133

4134 4135
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4136

H
Haojun Liao 已提交
4137
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4138 4139 4140 4141 4142
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4143

4144 4145
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4146
    }
H
refact  
Hongze Cheng 已提交
4147

H
Hongze Cheng 已提交
4148 4149
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4150
  }
H
Hongze Cheng 已提交
4151

H
refact  
Hongze Cheng 已提交
4152 4153
  return code;
}
H
Hongze Cheng 已提交
4154

H
refact  
Hongze Cheng 已提交
4155
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4156
  int64_t rows = 0;
H
Hongze Cheng 已提交
4157

4158 4159
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4160

4161
  while (pStatus->pTableIter != NULL) {
4162
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4163 4164 4165

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4166
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4167 4168 4169 4170 4171 4172 4173
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4174
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4175 4176 4177 4178 4179 4180 4181 4182
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4183

H
refact  
Hongze Cheng 已提交
4184
  return rows;
H
Hongze Cheng 已提交
4185
}
D
dapan1121 已提交
4186

L
Liu Jicong 已提交
4187
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4200

D
dapan1121 已提交
4201
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4202
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4217
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4218

D
dapan1121 已提交
4219 4220
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4221

H
Haojun Liao 已提交
4222
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4251
  // fs
H
Hongze Cheng 已提交
4252 4253 4254 4255 4256
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4257 4258 4259 4260 4261 4262 4263 4264

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4265
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4266
  _exit:
H
Hongze Cheng 已提交
4267 4268 4269
  return code;
}

H
Haojun Liao 已提交
4270
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4271 4272 4273 4274 4275 4276 4277 4278 4279
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4280
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4281
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4282
  }
H
Haojun Liao 已提交
4283 4284
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}