tsdbRead.c 140.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
73 74
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
75
  double  createScanInfoList;
H
Hongze Cheng 已提交
76 77 78
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
79
  SArray*          pColAgg;
80
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
81
  SColumnDataAgg** plist;
82
  int16_t*         colIds;  // column ids for loading file block data
83
  int32_t          numOfCols;
84
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
85 86
} SBlockLoadSuppInfo;

87
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
88 89 90 91 92
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
93
  SSttBlockLoadInfo* pInfo;
94 95
} SLastBlockReader;

96
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
97 98 99
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
100
  int32_t           order;
H
Hongze Cheng 已提交
101
  SLastBlockReader* pLastBlockReader;  // last file block reader
102
} SFilesetIter;
H
Haojun Liao 已提交
103 104

typedef struct SFileDataBlockInfo {
105
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
106
  uint64_t uid;
107
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
108 109 110
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
111
  int32_t   numOfBlocks;
112
  int32_t   index;
H
Hongze Cheng 已提交
113
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
114
  int32_t   order;
115
  SDataBlk  block;  // current SDataBlk data
116
  SHashObj* pTableMap;
H
Haojun Liao 已提交
117 118 119
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
120 121 122 123
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
124 125
} SFileBlockDumpInfo;

126
typedef struct SUidOrderCheckInfo {
127 128
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
129 130
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
131
typedef struct SReaderStatus {
132
  bool                 loadFromFile;       // check file stage
133
  bool                 composedDataBlock;  // the returned data block is a composed block or not
134
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
135
  STableBlockScanInfo**pTableIter;         // table iterator used in building in-memory buffer data blocks.
136
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
137
  SFileBlockDumpInfo   fBlockDumpInfo;
138 139 140 141
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
142 143
} SReaderStatus;

144 145 146 147 148 149
typedef struct SBlockInfoBuf {
  int32_t  currentIndex;
  SArray*  pData;
  int32_t  numPerBucket;
} SBlockInfoBuf;

H
Hongze Cheng 已提交
150
struct STsdbReader {
H
Haojun Liao 已提交
151 152 153 154 155 156 157
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
158 159
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
160
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
161
  STsdbReadSnap*     pReadSnap;
162
  SIOCostSummary     cost;
163 164
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
165 166
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
167 168 169
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
170
};
H
Hongze Cheng 已提交
171

H
Haojun Liao 已提交
172
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
173 174
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
175
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
176 177
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
178
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
179
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
180
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
181
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
182
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
183
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
184
                                         int32_t rowIndex);
185
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
186
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
187

H
Hongze Cheng 已提交
188 189 190 191
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
192 193
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
194

dengyihao's avatar
dengyihao 已提交
195 196 197 198
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
199
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
200 201 202
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
203
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
204 205
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
206

207 208
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

209 210 211
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

212
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
213

214
  pSupInfo->numOfCols = numOfCols;
215
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
216
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
217 218 219
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
220 221
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
222

H
Haojun Liao 已提交
223 224
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
225
    pSupInfo->colIds[i] = pCol->info.colId;
226 227 228 229

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
230
  }
H
Hongze Cheng 已提交
231

H
Haojun Liao 已提交
232 233
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
234

235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
  int32_t num =  numOfTables / pBuf->numPerBucket;
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

  for(int32_t i = 0; i < num; ++i) {
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
  for(int32_t i = 0; i < num; ++i) {
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
  char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
279
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
280
  // allocate buffer in order to load data blocks from file
281
  // todo use simple hash instead, optimize the memory consumption
282 283 284
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
285 286 287
    return NULL;
  }

H
Haojun Liao 已提交
288
  int64_t st = taosGetTimestampUs();
289
  initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
H
Haojun Liao 已提交
290

291
  for (int32_t j = 0; j < numOfTables; ++j) {
292 293 294 295 296 297 298 299 300 301 302 303 304 305
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);

#if 0
//    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
306
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
307
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
308
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
309
    } else {
H
Haojun Liao 已提交
310
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
311
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
312
    }
wmmhello's avatar
wmmhello 已提交
313

314
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
315 316 317
#endif

    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
318
              pTsdbReader->idStr);
H
Haojun Liao 已提交
319 320
  }

H
Haojun Liao 已提交
321 322 323 324
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
325

326
  return pTableMap;
H
Hongze Cheng 已提交
327
}
H
Hongze Cheng 已提交
328

329 330
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
331

dengyihao's avatar
dengyihao 已提交
332
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
333 334 335 336 337 338
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;

    pInfo->iterInit = false;
    pInfo->iiter.hasVal = false;
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
339 340
    }

341 342
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
343 344 345
  }
}

346 347 348
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
349

350 351 352
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
353

354 355 356
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
357

358 359 360 361
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
362

363
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
364 365
  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
366
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
367 368 369 370 371
  }

  taosHashCleanup(pTableMap);
}

372
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
373 374
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
375
}
H
Hongze Cheng 已提交
376

377 378 379
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
380
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
381

382
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
383
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
384

dengyihao's avatar
dengyihao 已提交
385
  STimeWindow win = *pWindow;
386 387 388 389 390 391
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
392

H
Haojun Liao 已提交
393
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
394 395 396 397 398 399
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
400 401 402
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
403 404 405 406
  }
}

// init file iterator
407
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
408
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
409

410 411
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
412
  pIter->pFileList = aDFileSet;
413
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
414

415 416 417 418
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
419
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
420 421
      return code;
    }
422 423
  }

424 425 426 427 428 429 430 431
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

432
  if (pLReader->pInfo == NULL) {
433
    // here we ignore the first column, which is always be the primary timestamp column
434 435
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
436 437 438 439
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
440 441
  }

442
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
443 444 445
  return TSDB_CODE_SUCCESS;
}

446
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
447 448
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
449 450 451
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
452 453 454
    return false;
  }

H
Haojun Liao 已提交
455 456 457
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

458 459
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
460
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
461

H
Haojun Liao 已提交
462 463
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
464

465
  while (1) {
H
Haojun Liao 已提交
466 467 468
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
469

470
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
471

472 473 474 475
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
476

477 478
    pReader->cost.headFileLoad += 1;

479 480 481 482 483 484 485 486 487 488 489 490
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
491 492 493
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
494 495
      continue;
    }
C
Cary Xu 已提交
496

497
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
498
              pReader->window.ekey, pReader->idStr);
499 500
    return true;
  }
501

H
Haojun Liao 已提交
502
_err:
H
Haojun Liao 已提交
503 504 505
  return false;
}

506
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
507 508
  pIter->order = order;
  pIter->index = -1;
509
  pIter->numOfBlocks = 0;
510 511 512 513 514 515 516
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
517
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
518

H
Haojun Liao 已提交
519
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
520 521
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
522 523
}

524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

547 548
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
549
  int32_t      code = 0;
550
  int8_t       level = 0;
H
Haojun Liao 已提交
551
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
552 553
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
554
    goto _end;
H
Hongze Cheng 已提交
555 556
  }

C
Cary Xu 已提交
557 558 559 560
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
561
  initReaderStatus(&pReader->status);
562

L
Liu Jicong 已提交
563
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
564 565
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
566
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
567 568
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
569
  pReader->type = pCond->type;
570
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
571
  pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
572
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
573

574
  limitOutputBufferSize(pCond, &pReader->capacity);
575

576 577
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
578
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
579
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
580
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
581 582 583
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
584

585 586
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
587
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
588 589 590 591 592
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

593 594 595 596
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
597
  }
H
Hongze Cheng 已提交
598

599 600
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
601 602
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
603

H
Haojun Liao 已提交
604
_end:
H
Haojun Liao 已提交
605
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
606 607 608
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
609

H
Haojun Liao 已提交
610
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
611
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
612

613
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
614
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
615
  if (code != TSDB_CODE_SUCCESS) {
616
    goto _end;
H
Haojun Liao 已提交
617
  }
H
Hongze Cheng 已提交
618

619 620
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
621
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
622 623
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
624

625 626 627 628
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
629
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
630

631
    // uid check
H
Hongze Cheng 已提交
632
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
633 634 635 636
      continue;
    }

    // this block belongs to a table that is not queried.
637
    void* p = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
638 639 640 641 642 643
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
644
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
645 646
    }

H
Hongze Cheng 已提交
647
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
648
  }
H
Hongze Cheng 已提交
649

650
  int64_t et2 = taosGetTimestampUs();
651
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
652
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
653 654 655

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
656
_end:
H
Hongze Cheng 已提交
657
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
658 659
  return code;
}
H
Hongze Cheng 已提交
660

661
static void cleanupTableScanInfo(SHashObj* pTableMap) {
662
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
663
  while (1) {
664
    px = taosHashIterate(pTableMap, px);
665 666 667 668
    if (px == NULL) {
      break;
    }

669
    // reset the index in last block when handing a new file
670 671
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
672
  }
673 674
}

675
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
676 677 678 679 680 681
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
682

dengyihao's avatar
dengyihao 已提交
683
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
684
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
685

686
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
687

688
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
689
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
690

691
    sizeInDisk += pScanInfo->mapData.nData;
692
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
693 694
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
695

696
      // 1. time range check
697
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
698 699
        continue;
      }
H
Hongze Cheng 已提交
700

701
      // 2. version range check
H
Hongze Cheng 已提交
702
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
703 704
        continue;
      }
705

706
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
707
      if (p == NULL) {
708
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
709 710
        return TSDB_CODE_OUT_OF_MEMORY;
      }
711

712
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
713
    }
H
Hongze Cheng 已提交
714

H
Haojun Liao 已提交
715
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
716 717 718 719
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
720
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
721
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
722

723
  double el = (taosGetTimestampUs() - st) / 1000.0;
724 725 726 727 728
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
729

730
  pReader->cost.numOfBlocks += total;
731
  pReader->cost.headFileLoadTime += el;
732

H
Haojun Liao 已提交
733 734
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
735

736
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
737
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
738
  pDumpInfo->allDumped = true;
739
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
740 741
}

742 743
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
744
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
745
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
746 747 748
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
749
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
750 751 752 753
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
754
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
755
  }
H
Haojun Liao 已提交
756 757
}

758
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
759 760
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
761 762
    return NULL;
  }
763 764 765

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
766 767
}

H
Hongze Cheng 已提交
768
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
769

H
Haojun Liao 已提交
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
843
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
844 845 846 847
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
848 849
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
850 851
    while (1) {
      // check can return
H
Hongze Cheng 已提交
852 853 854
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
855 856

      // change start or end position
H
Hongze Cheng 已提交
857
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
858 859
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
860
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
861 862 863 864
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
865
  } else {  // DESC
H
Haojun Liao 已提交
866
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
867 868
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
869 870
    while (1) {
      // check can return
H
Hongze Cheng 已提交
871 872 873
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
874 875

      // change start or end position
H
Hongze Cheng 已提交
876
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
877 878
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
879
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
880 881 882 883 884 885 886 887 888 889
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
890
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
891 892 893 894 895 896 897 898 899 900 901 902

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order);
  }

  return endPos;
}

903
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
904
  SReaderStatus*  pStatus = &pReader->status;
905
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
906

907
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
908
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
909
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
910
  SSDataBlock*        pResBlock = pReader->pResBlock;
911
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
912

H
Haojun Liao 已提交
913
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
914
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
915

H
Haojun Liao 已提交
916
  SColVal cv = {0};
917
  int64_t st = taosGetTimestampUs();
918 919
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
920

921 922
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
923 924 925
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
926 927 928 929 930
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
      int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
    }
H
Haojun Liao 已提交
931 932 933
  }

  // time window check
934 935 936 937 938 939 940
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
941
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
942
  if (remain > pReader->capacity) {  // output buffer check
943 944 945
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
946 947
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
948
  int32_t          i = 0;
949 950
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
951
    if (asc) {
H
Haojun Liao 已提交
952
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
953 954 955 956
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
957
    }
H
Haojun Liao 已提交
958

959 960 961
    i += 1;
  }

962 963 964
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
965 966 967
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
968
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
969 970 971
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
972
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
973 974 975 976
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
977
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
978 979 980 981

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
982
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
983 984 985 986 987 988 989 990 991 992 993
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
994
      }
H
Haojun Liao 已提交
995

996
      colIndex += 1;
997
      i += 1;
998 999
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
1000
      i += 1;
H
Haojun Liao 已提交
1001
    }
1002 1003
  }

1004
  // fill the mis-matched columns with null value
1005
  while (i < numOfOutputCols) {
1006 1007 1008
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
1009
  }
H
Haojun Liao 已提交
1010

1011
  pResBlock->info.rows = remain;
1012
  pDumpInfo->rowIndex += step * remain;
1013

1014
  // check if current block are all handled
1015
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1016 1017 1018 1019
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1020
  } else {
1021 1022
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1023
  }
H
Haojun Liao 已提交
1024

1025
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1026
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1027

1028
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1029
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1030
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
1031 1032
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1033 1034 1035 1036

  return TSDB_CODE_SUCCESS;
}

1037 1038
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1039 1040
  int64_t st = taosGetTimestampUs();

1041 1042
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1043 1044
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1045 1046 1047 1048
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1049
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1050
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1051
  ASSERT(pBlockInfo != NULL);
1052

H
Hongze Cheng 已提交
1053
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1054
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1055 1056
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
1057
                  ", rows:%d, code:%s %s",
1058
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1059 1060 1061
              tstrerror(code), pReader->idStr);
    return code;
  }
1062

1063
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1064

1065
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
1066
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1067 1068
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1069 1070 1071

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1072

H
Haojun Liao 已提交
1073
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1074
}
H
Hongze Cheng 已提交
1075

H
Haojun Liao 已提交
1076 1077 1078
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1079

H
Haojun Liao 已提交
1080 1081 1082 1083
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1084

H
Haojun Liao 已提交
1085 1086
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1087

H
Haojun Liao 已提交
1088 1089
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1090

H
Haojun Liao 已提交
1091
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1092 1093
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1094

H
Haojun Liao 已提交
1095 1096 1097 1098
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1099

H
Haojun Liao 已提交
1100 1101
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1102

H
Haojun Liao 已提交
1103
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1104
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1105
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1106

H
Haojun Liao 已提交
1107
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1108

H
Haojun Liao 已提交
1109 1110
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1111

H
Haojun Liao 已提交
1112 1113 1114 1115 1116 1117 1118
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1119

1120
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1121
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1122

1123 1124 1125
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1126
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1127 1128 1129
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1130
    if (pScanInfo == NULL) {
1131
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1132 1133 1134 1135
      return TSDB_CODE_INVALID_PARA;
    }

    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
1136
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
1137
  }
1138 1139 1140 1141 1142 1143

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1144
}
H
Hongze Cheng 已提交
1145

1146
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1147
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1148

1149
  pBlockIter->numOfBlocks = numOfBlocks;
1150
  taosArrayClear(pBlockIter->blockList);
1151
  pBlockIter->pTableMap = pReader->status.pTableMap;
1152

1153 1154
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1155

1156
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
1157

1158
  SBlockOrderSupporter sup = {0};
1159
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
1160 1161 1162
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1163

1164 1165 1166 1167 1168 1169 1170
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1171

1172
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1173 1174 1175
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1176

1177 1178
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1179

1180 1181 1182 1183 1184
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1185

1186
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
1187
    SDataBlk block = {0};
1188 1189
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1190 1191

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
1192
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
1193

1194
      wrapper.uid = pTableScanInfo->uid;
1195
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1196

1197 1198 1199 1200 1201 1202
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1203

1204
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1205

1206
  // since there is only one table qualified, blocks are not sorted
1207 1208
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1209 1210
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1211
    }
1212

1213
    int64_t et = taosGetTimestampUs();
1214
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1215
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1216

1217
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1218
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1219
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1220
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1221
  }
H
Haojun Liao 已提交
1222

1223 1224
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1225

1226
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1227

1228 1229 1230 1231 1232
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1233
  }
H
Haojun Liao 已提交
1234

1235 1236 1237 1238
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1239

1240 1241
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1242

1243 1244 1245 1246
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1247

1248 1249
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1250
  }
H
Haojun Liao 已提交
1251

1252
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1253 1254
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1255 1256
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1257

1258
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1259
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1260

1261
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1262
}
H
Hongze Cheng 已提交
1263

H
Haojun Liao 已提交
1264
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1265 1266
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1267
  int32_t step = asc ? 1 : -1;
1268
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1269 1270 1271
    return false;
  }

1272
  pBlockIter->index += step;
H
Haojun Liao 已提交
1273
  doSetCurrentBlock(pBlockIter, idStr);
1274

1275 1276 1277
  return true;
}

1278 1279 1280
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1281
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1282 1283
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1284 1285
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1286
}
H
Hongze Cheng 已提交
1287

H
Haojun Liao 已提交
1288
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
H
Hongze Cheng 已提交
1289
                                             int32_t* nextIndex, int32_t order) {
1290
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1291
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1292
    return NULL;
1293 1294
  }

H
Haojun Liao 已提交
1295
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1296 1297 1298
    return NULL;
  }

1299
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1300
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1301

H
Hongze Cheng 已提交
1302 1303
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1304

H
Hongze Cheng 已提交
1305
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1306
  return pBlock;
1307 1308 1309 1310 1311
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1312
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1313 1314
  int32_t index = pBlockIter->index;

1315
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1328
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1329
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1330 1331 1332 1333
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1334 1335 1336 1337 1338
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1339

1340 1341 1342
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1343

H
Haojun Liao 已提交
1344
  doSetCurrentBlock(pBlockIter, "");
1345 1346 1347
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1348
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1349 1350 1351 1352 1353 1354
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1355
}
H
Hongze Cheng 已提交
1356

H
Hongze Cheng 已提交
1357
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1358
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1359

1360
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1361
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1362
}
H
Hongze Cheng 已提交
1363

H
Hongze Cheng 已提交
1364
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1365 1366
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1367 1368
}

H
Hongze Cheng 已提交
1369
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1370 1371 1372 1373 1374
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1375
      if (p->version >= pBlock->minVer) {
1376 1377 1378
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1379
      if (p->version >= pBlock->minVer) {
1380 1381
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1382 1383
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1397
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1398 1399 1400 1401
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1402
  // ts is not overlap
1403
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1404
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1405 1406 1407 1408 1409
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1410 1411 1412 1413
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1414
    while (1) {
1415 1416 1417 1418 1419
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1420 1421 1422
      }
    }

1423 1424
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1425 1426
}

H
Haojun Liao 已提交
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
H
Hongze Cheng 已提交
1440
  int32_t   neighborIndex = 0;
H
Haojun Liao 已提交
1441
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order);
1442

1443
  // overlap with neighbor
1444
  if (pNeighbor) {
H
Haojun Liao 已提交
1445
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1446
    taosMemoryFree(pNeighbor);
1447 1448
  }

1449
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1450 1451
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1452

H
Haojun Liao 已提交
1453 1454 1455
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1456 1457
  }

H
Haojun Liao 已提交
1458 1459 1460 1461
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1462

H
Haojun Liao 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1477 1478 1479 1480

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1481 1482
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1483 1484 1485
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1486 1487 1488
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1489 1490
}

H
Haojun Liao 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1500
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1501
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1502 1503
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1504

1505 1506 1507
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1508
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1509

1510
  blockDataUpdateTsWindow(pBlock, 0);
1511
  pBlock->info.uid = pBlockScanInfo->uid;
1512

1513
  setComposedBlockFlag(pReader, true);
1514

1515
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1516
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1517
                " - %" PRId64 " %s",
1518 1519
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1520 1521

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1522 1523 1524
  return code;
}

1525 1526
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1527 1528 1529 1530 1531
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1532
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1533 1534

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1535
    if (nextKey != key) {  // merge is not needed
1536
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1537 1538 1539 1540 1541 1542 1543 1544
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1545 1546
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1547 1548 1549 1550 1551 1552 1553 1554
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1555 1556
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1557 1558 1559 1560 1561 1562 1563
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1564
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1579 1580 1581
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1582
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1583 1584
  }

1585
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1586 1587 1588 1589 1590 1591
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1592 1593 1594 1595 1596 1597
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1598 1599 1600 1601 1602 1603
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1604
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1605
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1606
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1607 1608
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1609 1610
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1611
  }
H
Haojun Liao 已提交
1612 1613
}

1614
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1615 1616 1617 1618 1619 1620
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1621
  int64_t tsLast = INT64_MIN;
1622
  if (hasDataInLastBlock(pLastBlockReader)) {
1623 1624
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1625

H
Hongze Cheng 已提交
1626 1627
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1628

1629 1630
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1631
    minKey = INT64_MAX;  // chosen the minimum value
1632
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1633 1634
      minKey = tsLast;
    }
1635

1636 1637 1638
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1639

1640
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1641 1642 1643 1644
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1645
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1646 1647 1648 1649 1650 1651 1652
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1653
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1654 1655
      minKey = key;
    }
1656 1657 1658 1659
  }

  bool init = false;

1660
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1661
  // DESC: mem -----> imem -----> last block -----> file block
1662 1663
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1664
      init = true;
H
Haojun Liao 已提交
1665 1666 1667 1668
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1669
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1670 1671
    }

1672
    if (minKey == tsLast) {
1673
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1674 1675 1676
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1677
        init = true;
H
Haojun Liao 已提交
1678 1679 1680 1681
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1682
      }
1683
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1684
    }
1685

1686
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1687 1688 1689
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1690 1691
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1692 1693 1694 1695 1696 1697 1698 1699
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1700 1701 1702 1703 1704
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1705
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1706 1707 1708 1709 1710 1711
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1712
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1713 1714
        return code;
      }
1715 1716
    }

1717
    if (minKey == tsLast) {
1718
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1719 1720 1721
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1722
        init = true;
H
Haojun Liao 已提交
1723 1724 1725 1726
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1727
      }
1728
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1729 1730 1731
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1732 1733 1734
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1735
        init = true;
H
Haojun Liao 已提交
1736 1737 1738 1739
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1740 1741 1742
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1743 1744
  }

1745 1746 1747 1748 1749
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1750 1751 1752 1753 1754 1755 1756
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1757 1758 1759
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1760
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1761
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1762 1763 1764

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1765
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1766
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1767

1768 1769 1770 1771 1772
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1773 1774 1775 1776
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1777 1778 1779

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1780
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1781

H
Haojun Liao 已提交
1782
      code = tRowMergerGetRow(&merge, &pTSRow);
1783 1784 1785
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1786

1787 1788 1789 1790 1791 1792
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1793 1794 1795 1796 1797
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1798
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1799
    ASSERT(mergeBlockData);
1800 1801

    // merge with block data if ts == key
H
Haojun Liao 已提交
1802
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1803 1804 1805
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1806
    code = tRowMergerGetRow(&merge, &pTSRow);
1807 1808 1809 1810 1811 1812 1813 1814 1815
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1816 1817 1818 1819

  return TSDB_CODE_SUCCESS;
}

1820 1821
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1822 1823
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1824
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1825
    // no last block available, only data block exists
1826
    if (!hasDataInLastBlock(pLastBlockReader)) {
1827 1828 1829 1830 1831 1832 1833 1834 1835
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1836
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1837 1838 1839 1840
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1841

H
Haojun Liao 已提交
1842 1843 1844 1845 1846
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1847
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1848 1849 1850 1851

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1852
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1853

H
Haojun Liao 已提交
1854
        code = tRowMergerGetRow(&merge, &pTSRow);
1855 1856 1857 1858
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1859
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1860

1861 1862
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1863
        return code;
1864
      } else {
1865 1866
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1867
      }
1868
    } else {  // desc order
1869
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1870
    }
1871
  } else {  // only last block exists
1872
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1873
  }
1874 1875
}

1876 1877
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1878 1879 1880
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1881 1882 1883
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1884 1885
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1886 1887
  ASSERT(pRow != NULL && piRow != NULL);

1888
  int64_t tsLast = INT64_MIN;
1889 1890 1891
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1892

H
Hongze Cheng 已提交
1893
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1894 1895 1896 1897

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1898
  int64_t minKey = 0;
1899 1900 1901 1902 1903
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1904

1905 1906 1907
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1908

1909
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1910 1911
      minKey = key;
    }
1912

1913
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1914 1915 1916
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1917
    minKey = INT64_MIN;  // let find the maximum ts value
1918 1919 1920 1921 1922 1923 1924 1925
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1926
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1927 1928 1929
      minKey = key;
    }

1930
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1931 1932
      minKey = tsLast;
    }
1933 1934 1935 1936
  }

  bool init = false;

1937 1938 1939 1940
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1941
      init = true;
1942
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1943
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1944 1945 1946 1947
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1948
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1949 1950
    }

1951
    if (minKey == tsLast) {
1952
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1953 1954 1955
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1956
        init = true;
1957
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1958 1959 1960
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1961
      }
H
Haojun Liao 已提交
1962

1963
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1964 1965 1966
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1967 1968 1969
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1970 1971
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1972 1973 1974 1975
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1976
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1977 1978 1979
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1980
      }
H
Haojun Liao 已提交
1981

1982 1983
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1984 1985 1986
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1987 1988
    }

1989
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1990
      if (init) {
1991 1992 1993 1994
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1995 1996
        tRowMerge(&merge, pRow);
      } else {
1997
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1998
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1999 2000 2001
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2002
      }
H
Haojun Liao 已提交
2003 2004 2005 2006 2007
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2008 2009 2010 2011
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2012
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2013
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2014 2015 2016 2017
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2018 2019 2020 2021 2022
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2023 2024 2025
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2026 2027 2028
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2029 2030
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2031
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2032 2033 2034
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2035
      }
H
Haojun Liao 已提交
2036 2037 2038 2039 2040
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2041 2042 2043
    }

    if (minKey == tsLast) {
2044
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2045 2046 2047
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2048
        init = true;
2049
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2050 2051 2052
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2053
      }
2054
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2055 2056 2057
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2058
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2059
      if (!init) {
2060
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2061 2062 2063
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2064
      } else {
2065 2066 2067
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2068
        tRowMerge(&merge, &fRow);
2069 2070
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2071 2072 2073
    }
  }

2074
  if (merge.pTSchema == NULL) {
2075 2076 2077
    return code;
  }

H
Haojun Liao 已提交
2078
  code = tRowMergerGetRow(&merge, &pTSRow);
2079 2080 2081 2082
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2083 2084 2085 2086
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2087
  return code;
2088 2089
}

2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2115
                  "-%" PRId64 " %s",
2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2136
                  "-%" PRId64 " %s",
2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2154 2155
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2156 2157 2158 2159 2160 2161 2162 2163
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2175
  TSDBKEY k = {.ts = ts, .version = ver};
2176 2177
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2178 2179 2180
    return false;
  }

2181 2182 2183
  return true;
}

2184
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2185
  // the last block reader has been initialized for this table.
2186
  if (pLBlockReader->uid == pScanInfo->uid) {
2187 2188 2189
    return true;
  }

2190 2191
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2192 2193
  }

2194 2195
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2196

H
Hongze Cheng 已提交
2197
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2198 2199 2200
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2201
  } else {
2202
    w.ekey = pScanInfo->lastKey + step;
2203 2204
  }

2205 2206 2207
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2208 2209 2210 2211
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2212
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2213 2214
}

2215
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2216
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2217
  return TSDBROW_TS(&row);
2218 2219
}

H
Hongze Cheng 已提交
2220
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2221 2222 2223 2224

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2225 2226
  }

2227
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2228
}
2229

2230 2231
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2232 2233 2234 2235
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2236 2237
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2238 2239 2240
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2241 2242 2243 2244 2245
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2246
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2247
    code = tRowMergerGetRow(&merge, &pTSRow);
2248 2249 2250 2251
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2252 2253 2254 2255 2256 2257 2258 2259
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2260 2261
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2262 2263
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2264
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2265
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2266
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2267
  } else {
2268 2269 2270 2271 2272 2273 2274 2275 2276
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2277
    // imem + file + last block
2278
    if (pBlockScanInfo->iiter.hasVal) {
2279
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2280 2281
    }

2282
    // mem + file + last block
2283
    if (pBlockScanInfo->iter.hasVal) {
2284
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2285
    }
2286

2287 2288
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2289 2290 2291
  }
}

2292
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2293 2294
  int32_t code = TSDB_CODE_SUCCESS;

2295 2296
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2297
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2298 2299 2300
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2301 2302 2303

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2304
    pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2305 2306
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2307 2308
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2309 2310 2311
      goto _end;
    }

H
Haojun Liao 已提交
2312
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2313
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2314 2315 2316

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2317 2318
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2319 2320 2321
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2322 2323
    }
  } else {  // file blocks not exist
2324
    pBlockScanInfo = *pReader->status.pTableIter;
2325 2326
  }

2327
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2328 2329
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2330

2331
  while (1) {
2332
    bool hasBlockData = false;
2333
    {
H
Haojun Liao 已提交
2334
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2335 2336 2337 2338 2339
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2340 2341
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2342
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2343
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2344
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2345 2346 2347
          break;
        }
      }
2348
    }
2349

2350
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2351

2352 2353 2354
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2355 2356
    }

2357
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2358

2359
    // currently loaded file data block is consumed
2360
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2361
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2362
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2363 2364 2365 2366 2367
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2368 2369 2370
    }
  }

H
Haojun Liao 已提交
2371
_end:
2372
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2373 2374
  blockDataUpdateTsWindow(pResBlock, 0);

2375
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2376
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2377 2378 2379

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2380

2381 2382
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2383
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2384
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2385
              pResBlock->info.rows, el, pReader->idStr);
2386
  }
2387

H
Haojun Liao 已提交
2388
  return code;
2389 2390 2391 2392
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2393 2394
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2395 2396 2397
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2398

2399 2400 2401
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2402 2403
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2404
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2405 2406
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2407
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2408
    if (code != TSDB_CODE_SUCCESS) {
2409 2410 2411 2412 2413
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2414
      tsdbDelFReaderClose(&pDelFReader);
2415 2416 2417
      goto _err;
    }

H
Hongze Cheng 已提交
2418
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2419 2420 2421
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2422 2423
      goto _err;
    }
2424

2425 2426 2427
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2428
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2429
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2430 2431 2432 2433 2434 2435 2436
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2437
    }
2438
  }
2439

2440 2441 2442 2443 2444 2445 2446
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2447 2448
  }

2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2463
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2464
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2465 2466
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2467
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2468 2469
  return code;

H
Haojun Liao 已提交
2470
_err:
2471 2472
  taosArrayDestroy(pDelData);
  return code;
2473 2474
}

H
Haojun Liao 已提交
2475
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2476
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2477
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2478
  if (pRow != NULL) {
2479 2480 2481
    key = TSDBROW_KEY(pRow);
  }

2482
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2483
  if (pRow != NULL) {
2484 2485 2486 2487 2488 2489 2490 2491 2492
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2493
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2494
  SReaderStatus* pStatus = &pReader->status;
2495
  pBlockNum->numOfBlocks = 0;
2496
  pBlockNum->numOfLastFiles = 0;
2497

2498
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2499
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2500 2501

  while (1) {
2502
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2503
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2504 2505 2506
      break;
    }

H
Haojun Liao 已提交
2507
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2508 2509
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2510
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2511 2512 2513
      return code;
    }

H
Hongze Cheng 已提交
2514
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2515
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2516
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2517
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2518 2519 2520
        return code;
      }

2521
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2522 2523 2524
        break;
      }
    }
2525

H
Haojun Liao 已提交
2526 2527 2528
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2529
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2530 2531 2532
  return TSDB_CODE_SUCCESS;
}

2533
static int32_t uidComparFunc(const void* p1, const void* p2) {
2534 2535
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2536 2537 2538
  if (pu1 == pu2) {
    return 0;
  } else {
2539
    return (pu1 < pu2) ? -1 : 1;
2540 2541
  }
}
2542

2543
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2544 2545 2546 2547
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2548
  while (p != NULL) {
2549
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
2550 2551 2552 2553 2554 2555 2556
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2557
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2558 2559 2560 2561
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2562

2563
  if (pOrderCheckInfo->tableUidList == NULL) {
2564 2565 2566 2567 2568 2569
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2570
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2571 2572 2573
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2574 2575
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2576 2577
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2578 2579

      // the tableMap has already updated
2580
      if (pStatus->pTableIter == NULL) {
2581
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2582 2583 2584 2585 2586 2587 2588 2589 2590
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2591
      }
2592
    }
2593
  }
2594

2595 2596 2597
  return TSDB_CODE_SUCCESS;
}

2598
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2611
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2612
  SReaderStatus*    pStatus = &pReader->status;
2613 2614
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2615 2616
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2617
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2618 2619
    return code;
  }
2620

2621
  while (1) {
2622
    // load the last data block of current table
2623
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
H
Hongze Cheng 已提交
2624
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2625
    if (!hasVal) {
2626 2627
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2628 2629 2630
        return TSDB_CODE_SUCCESS;
      }
      continue;
2631 2632
    }

2633 2634 2635 2636 2637 2638 2639 2640
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2641

2642
    // current table is exhausted, let's try next table
2643 2644
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2645 2646
      return TSDB_CODE_SUCCESS;
    }
2647 2648 2649
  }
}

2650
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2651 2652
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2653 2654 2655

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2656 2657 2658
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2659

2660
  if (pBlockInfo != NULL) {
2661
    pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2662
  } else {
2663
    pScanInfo = *pReader->status.pTableIter;
2664 2665
  }

H
Haojun Liao 已提交
2666
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2667
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2668 2669 2670 2671
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2672
  if (pBlockInfo != NULL) {
2673
    pBlock = getCurrentBlock(pBlockIter);
2674 2675
  }

2676
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2677
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2678

2679 2680 2681
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2682
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2683
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2684 2685
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2686 2687 2688
    }

    // build composed data block
2689
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2690
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2691
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2692
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2693
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2694
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2695 2696 2697 2698
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2699
      ASSERT(tsLast >= pBlock->maxKey.ts);
2700 2701
      tBlockDataReset(&pReader->status.fileBlockData);

2702
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2703
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2704
    } else {  // whole block is required, return it directly
2705 2706 2707 2708 2709 2710 2711
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2712 2713 2714 2715 2716
  }

  return code;
}

H
Haojun Liao 已提交
2717
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2718 2719
  SReaderStatus* pStatus = &pReader->status;

2720
  while (1) {
2721 2722 2723
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2724
        return TSDB_CODE_SUCCESS;
2725 2726 2727
      }
    }

2728 2729
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2730

2731
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2732
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2733 2734 2735 2736
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2737
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2738
      return TSDB_CODE_SUCCESS;
2739 2740 2741 2742 2743
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2744
      return TSDB_CODE_SUCCESS;
2745 2746 2747 2748
    }
  }
}

2749
// set the correct start position in case of the first/last file block, according to the query time window
2750
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2751
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2752

2753 2754 2755
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2756 2757 2758

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2759
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2760 2761
}

2762
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2763 2764
  SBlockNumber num = {0};

2765
  int32_t code = moveToNextFile(pReader, &num);
2766 2767 2768 2769 2770
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2771
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2772 2773 2774 2775 2776
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2777 2778
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2779
  } else {  // no block data, only last block exists
2780
    tBlockDataReset(&pReader->status.fileBlockData);
2781
    resetDataBlockIterator(pBlockIter, pReader->order);
2782
  }
2783 2784

  // set the correct start position according to the query time window
2785
  initBlockDumpInfo(pReader, pBlockIter);
2786 2787 2788
  return code;
}

2789
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2790 2791
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2792 2793
}

2794
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2795
  int32_t code = TSDB_CODE_SUCCESS;
2796 2797
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2798 2799
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2800
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2801
  _begin:
2802 2803 2804 2805 2806
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2807 2808 2809 2810
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2811
    // all data blocks are checked in this last block file, now let's try the next file
2812 2813 2814 2815 2816 2817 2818 2819
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2820
      // this file does not have data files, let's start check the last block file if exists
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2836
  while (1) {
2837 2838
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2839
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2840
      code = buildComposedDataBlock(pReader);
2841 2842 2843 2844
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2845
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2846 2847
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2848
        } else {
H
Haojun Liao 已提交
2849
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2850 2851 2852 2853 2854 2855
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2856

2857 2858 2859 2860
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2861

2862 2863 2864 2865
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2866
          }
2867
        }
H
Haojun Liao 已提交
2868
      }
2869 2870

      code = doBuildDataBlock(pReader);
2871 2872
    }

2873 2874 2875 2876 2877 2878 2879 2880
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2881
}
H
refact  
Hongze Cheng 已提交
2882

2883 2884
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2885
  if (VND_IS_RSMA(pVnode)) {
2886
    int8_t  level = 0;
2887 2888
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2889
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2890 2891
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2892

2893
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2894 2895 2896 2897 2898 2899 2900
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2901
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2902 2903 2904 2905 2906
        break;
      }
      ++level;
    }

2907
    const char* str = (idStr != NULL) ? idStr : "";
2908 2909

    if (level == TSDB_RETENTION_L0) {
2910
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2911
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2912 2913
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2914
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2915
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2916 2917
      return VND_RSMA1(pVnode);
    } else {
2918
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2919
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2920 2921 2922 2923 2924 2925 2926
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2927
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2928
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2929 2930

  int64_t endVer = 0;
L
Liu Jicong 已提交
2931 2932
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2933 2934
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2935
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2936 2937
  }

H
Haojun Liao 已提交
2938
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2939 2940
}

2941
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2942 2943 2944 2945
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2946 2947 2948
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2949

2950 2951 2952 2953 2954 2955
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2956
        return false;
2957 2958 2959
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2960 2961
      }
    } else {
2962 2963 2964 2965 2966 2967 2968
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2969 2970
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2986 2987
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2988 2989 2990 2991 2992 2993
            return true;
          }
        }
      }

      return false;
2994 2995
    }
  } else {
2996 2997
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2998

2999 3000 3001 3002 3003 3004 3005
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3006
    } else {
3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3034 3035 3036 3037 3038
      }

      return false;
    }
  }
3039 3040

  return false;
3041 3042
}

3043
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3044
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3045 3046
    return NULL;
  }
H
Hongze Cheng 已提交
3047

3048
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3049
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3050
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3051
    pIter->hasVal = false;
H
Haojun Liao 已提交
3052 3053
    return NULL;
  }
H
Hongze Cheng 已提交
3054

3055
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3056
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3057
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3058 3059
    return pRow;
  }
H
Hongze Cheng 已提交
3060

3061
  while (1) {
3062 3063
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3064 3065
      return NULL;
    }
H
Hongze Cheng 已提交
3066

3067
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3068

H
Haojun Liao 已提交
3069
    key = TSDBROW_KEY(pRow);
3070
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3071
      pIter->hasVal = false;
H
Haojun Liao 已提交
3072 3073
      return NULL;
    }
H
Hongze Cheng 已提交
3074

dengyihao's avatar
dengyihao 已提交
3075
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3076
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3077 3078 3079 3080
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3081

3082 3083
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3084
  while (1) {
3085 3086
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3087 3088
      break;
    }
H
Hongze Cheng 已提交
3089

3090
    // data exists but not valid
3091
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3092 3093 3094 3095 3096
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3097
    TSDBKEY k = TSDBROW_KEY(pRow);
3098
    if (k.ts != ts) {
H
Haojun Liao 已提交
3099 3100 3101
      break;
    }

H
Haojun Liao 已提交
3102
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3103 3104 3105 3106
    if (pTSchema == NULL) {
      return terrno;
    }

3107
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3108 3109 3110 3111 3112
  }

  return TSDB_CODE_SUCCESS;
}

3113
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3114
                                          SVersionRange* pVerRange, int32_t step) {
3115 3116
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3117
      rowIndex += step;
3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3134
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3135 3136
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3137
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3138
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3139

3140
  *state = CHECK_FILEBLOCK_QUIT;
3141
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3142

H
Hongze Cheng 已提交
3143 3144
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
3145
  if (pNeighborBlock == NULL) {  // do nothing
3146 3147 3148 3149
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
3150 3151
  taosMemoryFree(pNeighborBlock);

3152
  if (overlap) {  // load next block
3153
    SReaderStatus*  pStatus = &pReader->status;
3154 3155
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3156
    // 1. find the next neighbor block in the scan block list
3157
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3158
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3159

3160
    // 2. remove it from the scan block list
3161
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3162

3163
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3164
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3165 3166 3167 3168
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3169
    // 4. check the data values
3170 3171 3172 3173
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3174
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3175 3176 3177 3178 3179 3180 3181
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3182 3183
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3184 3185
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3186
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3187
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3188
  int32_t step = asc ? 1 : -1;
3189

3190
  pDumpInfo->rowIndex += step;
3191
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3192 3193 3194
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3195

3196 3197 3198 3199
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3200

3201
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3202
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3203 3204 3205
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3206
      }
3207
    }
H
Haojun Liao 已提交
3208
  }
3209

H
Haojun Liao 已提交
3210 3211 3212
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3213
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3214
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3215
  pScanInfo->lastKey = ts;
3216
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3217 3218
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3219
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3220 3221 3222 3223 3224 3225 3226 3227 3228
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3229 3230
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3231
  TSDBROW* pNextRow = NULL;
3232
  TSDBROW  current = *pRow;
3233

3234 3235
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3236

3237 3238 3239
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3240
      return TSDB_CODE_SUCCESS;
3241
    } else {  // has next point in mem/imem
3242
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3243 3244 3245
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3246
        return TSDB_CODE_SUCCESS;
3247 3248
      }

H
Haojun Liao 已提交
3249
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3250 3251
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3252
        return TSDB_CODE_SUCCESS;
3253
      }
3254
    }
3255 3256
  }

3257 3258
  SRowMerger merge = {0};

3259
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3260
  terrno = 0;
H
Haojun Liao 已提交
3261
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3262 3263 3264
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3265

3266 3267
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3268
  }
H
Haojun Liao 已提交
3269

H
Haojun Liao 已提交
3270 3271 3272 3273
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3274 3275

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3276
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3277 3278 3279
    return terrno;
  }

H
Haojun Liao 已提交
3280 3281
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3282 3283 3284 3285 3286
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3287
  code = tRowMergerGetRow(&merge, pTSRow);
3288 3289 3290
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3291

3292
  tRowMergerClear(&merge);
3293
  *freeTSRow = true;
3294
  return TSDB_CODE_SUCCESS;
3295 3296
}

3297
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3298
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3299 3300
  SRowMerger merge = {0};

3301 3302 3303
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3304
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3305
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3306

H
Haojun Liao 已提交
3307 3308 3309 3310 3311 3312 3313 3314 3315 3316
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3317

3318
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3319 3320 3321 3322 3323 3324
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3325
  } else {
H
Haojun Liao 已提交
3326
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3327

H
Haojun Liao 已提交
3328
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3329
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3330 3331 3332 3333 3334 3335 3336 3337
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3338 3339

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3340 3341 3342 3343 3344
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3345
  }
3346

3347 3348
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3349 3350
}

3351 3352
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3353 3354
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3355
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3356
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3357

3358 3359
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3360
  if (pBlockScanInfo->iter.hasVal) {
3361 3362 3363 3364 3365 3366
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3367
  if (pBlockScanInfo->iiter.hasVal) {
3368 3369 3370 3371 3372 3373
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3374
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3375
    TSDBKEY k = TSDBROW_KEY(pRow);
3376
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3377

3378
    int32_t code = TSDB_CODE_SUCCESS;
3379 3380
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3381
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3382
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3383
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3384
      }
3385
    } else {  // ik.ts == k.ts
3386
      *freeTSRow = true;
3387 3388 3389 3390
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3391
    }
3392

3393
    return code;
H
Haojun Liao 已提交
3394 3395
  }

3396
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3397 3398
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3399 3400
  }

3401
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3402
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3403 3404 3405 3406 3407
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3408
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3409 3410 3411
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3412
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3413
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3414

3415
  SColVal colVal = {0};
3416
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3417

3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3429
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3430 3431 3432 3433 3434 3435 3436 3437
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3438
    }
3439 3440
  }

3441
  // set null value since current column does not exist in the "pSchema"
3442
  while (i < numOfCols) {
3443 3444 3445 3446 3447
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3448 3449 3450 3451
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3452 3453
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3454 3455 3456 3457 3458 3459 3460 3461
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3462
    i += 1;
3463 3464 3465
  }

  SColVal cv = {0};
3466 3467
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3468

3469
  while (i < numOfOutputCols && j < numOfInputCols) {
3470
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3471
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3472

3473 3474 3475 3476 3477
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3478
    if (pData->cid == pCol->info.colId) {
3479 3480
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3481
      j += 1;
H
Haojun Liao 已提交
3482 3483
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3484 3485 3486 3487 3488 3489 3490 3491
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3492
    colDataAppendNULL(pCol, outputRowIndex);
3493 3494 3495 3496 3497 3498 3499
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3500 3501
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3502 3503 3504 3505
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3506
    bool    freeTSRow = false;
3507
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3508 3509
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3510 3511
    }

H
Haojun Liao 已提交
3512
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3513 3514 3515
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3516 3517

    // no data in buffer, return immediately
3518
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3519 3520 3521
      break;
    }

3522
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3523 3524 3525 3526
      break;
    }
  } while (1);

3527
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3528 3529
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3530

3531 3532
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3533
  ASSERT(pReader != NULL);
3534

3535
  STableBlockScanInfo** p = NULL;
3536
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3537
    clearBlockScanInfo(*p);
3538 3539
  }

3540 3541
  taosHashClear(pReader->status.pTableMap);

3542 3543 3544 3545 3546 3547
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
    taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
  }

H
Hongze Cheng 已提交
3548 3549 3550
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3551 3552 3553 3554 3555 3556
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3557

dengyihao's avatar
dengyihao 已提交
3558 3559 3560 3561 3562 3563
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3564

H
Hongze Cheng 已提交
3565
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3566

3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3582
// ====================================== EXPOSED APIs ======================================
3583 3584
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3585 3586 3587 3588 3589 3590
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3591 3592
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3593 3594
    goto _err;
  }
H
Hongze Cheng 已提交
3595

3596
  // check for query time window
H
Haojun Liao 已提交
3597
  STsdbReader* pReader = *ppReader;
3598
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3599 3600 3601
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3602

3603 3604
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3605
    int32_t order = pCond->order;
3606
    if (order == TSDB_ORDER_ASC) {
3607
      pCond->twindows.ekey = window.skey;
3608 3609 3610
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3611
      pCond->twindows.skey = window.ekey;
3612 3613 3614 3615
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3616
    // here we only need one more row, so the capacity is set to be ONE.
3617 3618 3619 3620 3621 3622
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3623
      pCond->twindows.skey = window.ekey;
3624
      pCond->twindows.ekey = INT64_MAX;
3625
    } else {
3626
      pCond->twindows.skey = INT64_MIN;
3627
      pCond->twindows.ekey = window.ekey;
3628
    }
3629 3630
    pCond->order = order;

3631 3632 3633 3634 3635 3636
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3637
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3638
  if (pCond->suid != 0) {
3639
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3640
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3641
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3642
    }
3643 3644
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3645
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3646
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3647
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3648
    }
3649 3650
  }

H
Hongze Cheng 已提交
3651
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3652

3653
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3654 3655 3656
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3657

H
Haojun Liao 已提交
3658 3659 3660
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3661

3662
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3663
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3664 3665 3666
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3667

3668
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3669 3670 3671
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3672
      }
3673
    } else {
H
Haojun Liao 已提交
3674 3675
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3676

H
Haojun Liao 已提交
3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3689

H
Haojun Liao 已提交
3690
      code = doOpenReaderImpl(pPrevReader);
3691
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3692
        return code;
3693
      }
3694 3695 3696
    }
  }

3697
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3698
  return code;
H
Hongze Cheng 已提交
3699

3700
  _err:
H
Haojun Liao 已提交
3701
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3702
  return code;
H
refact  
Hongze Cheng 已提交
3703 3704 3705
}

void tsdbReaderClose(STsdbReader* pReader) {
3706 3707
  if (pReader == NULL) {
    return;
3708
  }
H
refact  
Hongze Cheng 已提交
3709

3710 3711
  {
    if (pReader->innerReader[0] != NULL) {
3712
      STsdbReader* p = pReader->innerReader[0];
3713

3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3725 3726 3727 3728 3729 3730

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3731
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3732

3733 3734 3735 3736
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3737
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3738 3739 3740 3741
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3742

3743
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3744
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3745 3746

  cleanupDataBlockIterator(&pReader->status.blockIter);
3747 3748

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3749
  destroyAllBlockScanInfo(pReader->status.pTableMap);
3750
  blockDataDestroy(pReader->pResBlock);
3751
  clearBlockScanInfoBuf(&pReader->blockInfoBuf);
3752

H
Haojun Liao 已提交
3753 3754 3755
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3756

H
Haojun Liao 已提交
3757
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3758

3759
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3760
  SIOCostSummary* pCost = &pReader->cost;
3761

H
Haojun Liao 已提交
3762 3763
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3764 3765
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3766

H
Haojun Liao 已提交
3767 3768 3769 3770 3771
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3772

H
Haojun Liao 已提交
3773 3774 3775 3776 3777 3778 3779 3780 3781 3782
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3783

3784 3785
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3786 3787 3788
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3789
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3790 3791
}

3792
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3793
  // cleanup the data that belongs to the previous data block
3794 3795
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3796

3797
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3798

3799 3800 3801 3802 3803
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3804

3805 3806 3807
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3808
      buildBlockFromBufferSequentially(pReader);
3809
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3810
    }
3811 3812 3813
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3814
  }
3815

3816
  return false;
H
refact  
Hongze Cheng 已提交
3817 3818
}

3819 3820 3821 3822 3823
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3824
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3825
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3826
    pReader->step = EXTERNAL_ROWS_PREV;
3827 3828 3829
    if (ret) {
      return ret;
    }
3830
  }
3831

3832
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3833 3834
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3835
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3836 3837 3838 3839 3840

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3841
    pReader->step = EXTERNAL_ROWS_MAIN;
3842 3843 3844 3845 3846 3847 3848
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3849
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3850 3851
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3852
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3853 3854 3855 3856
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3857
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3858
    pReader->step = EXTERNAL_ROWS_NEXT;
3859 3860 3861 3862 3863 3864 3865 3866
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3867
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
3868
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3869
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3870 3871 3872
    return false;
  }

H
Haojun Liao 已提交
3873
  return true;
3874 3875
}

3876
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3877 3878 3879 3880
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3881 3882
}

3883 3884
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3885
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3886
      setBlockInfo(pReader, pDataBlockInfo);
3887
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3888 3889 3890 3891 3892 3893 3894 3895 3896
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3897
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3898
  int32_t code = 0;
3899
  *allHave = false;
H
Hongze Cheng 已提交
3900

3901
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3902 3903 3904 3905
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3906
  // there is no statistics data for composed block
3907 3908 3909 3910
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3911

3912
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3913

H
Hongze Cheng 已提交
3914 3915
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3916

3917 3918
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3919
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3920
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3921
    if (code != TSDB_CODE_SUCCESS) {
3922 3923
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3924 3925
      return code;
    }
3926 3927 3928
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3929
  }
H
Hongze Cheng 已提交
3930

3931
  *allHave = true;
H
Hongze Cheng 已提交
3932

3933 3934
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3935

3936 3937
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3954 3955
      i += 1;
      j += 1;
3956 3957 3958 3959 3960 3961 3962
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3963
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3964
  pReader->cost.smaLoadTime += elapsed;
3965
  pReader->cost.smaDataLoad += 1;
3966 3967 3968

  *pBlockStatis = pSup->plist;

3969
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3970 3971
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3972
  return code;
H
Hongze Cheng 已提交
3973 3974
}

3975
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3976 3977 3978
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3979
    return pReader->pResBlock->pDataBlock;
3980
  }
3981

H
Haojun Liao 已提交
3982
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
3983
  STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
3984 3985 3986 3987 3988 3989
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3990

3991
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3992
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3993
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3994 3995
    terrno = code;
    return NULL;
3996
  }
3997 3998 3999

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4000 4001
}

4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4014
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4015
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4016 4017
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4018

L
Liu Jicong 已提交
4019
  pReader->order = pCond->order;
4020
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4021
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4022
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4023
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4024

4025
  // allocate buffer in order to load data blocks from file
4026
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4027 4028
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4029
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4030
  tsdbDataFReaderClose(&pReader->pFileReader);
4031

4032
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4033

4034
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4035
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
4036

H
Hongze Cheng 已提交
4037
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4038
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4039

4040
  int32_t         code = 0;
4041 4042
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

4043 4044 4045 4046 4047 4048
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4049 4050
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4051 4052 4053
      return code;
    }
  }
H
Hongze Cheng 已提交
4054

H
Hongze Cheng 已提交
4055
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
4056
                " in query %s",
H
Hongze Cheng 已提交
4057 4058
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4059

4060
  return code;
H
Hongze Cheng 已提交
4061
}
H
Hongze Cheng 已提交
4062

4063 4064 4065
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4066

4067 4068 4069 4070
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4071

4072 4073
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4074

4075 4076 4077
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4078

4079
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4080

4081
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4082

4083 4084
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4085

4086 4087
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4088

4089 4090
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4091
  }
H
Hongze Cheng 已提交
4092

4093
  pTableBlockInfo->numOfTables = numOfTables;
4094
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4095

4096 4097
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4098
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4099

4100 4101
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4102

4103 4104 4105
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4106

4107 4108 4109
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4110

4111 4112 4113
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4114

4115 4116
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4117

H
Haojun Liao 已提交
4118
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4119 4120 4121 4122 4123
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4124

4125 4126
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4127
    }
H
refact  
Hongze Cheng 已提交
4128

H
Hongze Cheng 已提交
4129 4130
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4131
  }
H
Hongze Cheng 已提交
4132

H
refact  
Hongze Cheng 已提交
4133 4134
  return code;
}
H
Hongze Cheng 已提交
4135

H
refact  
Hongze Cheng 已提交
4136
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4137
  int64_t rows = 0;
H
Hongze Cheng 已提交
4138

4139 4140
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4141

4142
  while (pStatus->pTableIter != NULL) {
4143
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4144 4145 4146

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4147
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4148 4149 4150 4151 4152 4153 4154
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4155
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4156 4157 4158 4159 4160 4161 4162 4163
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4164

H
refact  
Hongze Cheng 已提交
4165
  return rows;
H
Hongze Cheng 已提交
4166
}
D
dapan1121 已提交
4167

L
Liu Jicong 已提交
4168
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4181

D
dapan1121 已提交
4182
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4183
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4198
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4199

D
dapan1121 已提交
4200 4201
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4202

H
Haojun Liao 已提交
4203
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4232
  // fs
H
Hongze Cheng 已提交
4233 4234 4235 4236 4237
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4238 4239 4240 4241 4242 4243 4244 4245

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4246
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4247
  _exit:
H
Hongze Cheng 已提交
4248 4249 4250
  return code;
}

H
Haojun Liao 已提交
4251
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4252 4253 4254 4255 4256 4257 4258 4259 4260
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4261
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4262
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4263
  }
H
Haojun Liao 已提交
4264 4265
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}