tsdbRead.c 146.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
87
  SColumnDataAgg** plist;
88
  int16_t*         colIds;  // column ids for loading file block data
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
91
  bool             smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
92 93
} SBlockLoadSuppInfo;

94
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
95 96 97 98 99
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
100
  SSttBlockLoadInfo* pInfo;
101 102
} SLastBlockReader;

103
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
104 105 106
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
107
  int32_t           order;
H
Hongze Cheng 已提交
108
  SLastBlockReader* pLastBlockReader;  // last file block reader
109
} SFilesetIter;
H
Haojun Liao 已提交
110 111

typedef struct SFileDataBlockInfo {
112
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
113
  uint64_t uid;
114
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
115 116 117
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
118
  int32_t   numOfBlocks;
119
  int32_t   index;
H
Hongze Cheng 已提交
120
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
121
  int32_t   order;
122
  SDataBlk  block;  // current SDataBlk data
123
  SHashObj* pTableMap;
H
Haojun Liao 已提交
124 125 126
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
131 132
} SFileBlockDumpInfo;

133
typedef struct SUidOrderCheckInfo {
134 135
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
136 137
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
138
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
139 140 141
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
142
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
H
Hongze Cheng 已提交
143 144 145 146 147 148
  SUidOrderCheckInfo    uidCheckInfo;       // check all table in uid order
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
149 150
} SReaderStatus;

151
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
152 153 154
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
155 156
} SBlockInfoBuf;

H
Hongze Cheng 已提交
157
struct STsdbReader {
H
Haojun Liao 已提交
158 159 160 161 162 163 164
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
165 166
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
167
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
168
  STsdbReadSnap*     pReadSnap;
169
  SIOCostSummary     cost;
170 171
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
172 173
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
174 175 176
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
177
};
H
Hongze Cheng 已提交
178

H
Haojun Liao 已提交
179
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
180 181
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
182
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
183 184
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
185
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
186
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
187
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
188
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
189 190
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                                     STableBlockScanInfo* pInfo);
191
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
192
                                         int32_t rowIndex);
193
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
194 195
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
196

H
Hongze Cheng 已提交
197 198 199 200
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
201 202
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
203

dengyihao's avatar
dengyihao 已提交
204 205 206 207
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
208
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
209 210 211
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
212
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
213
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
H
Haojun Liao 已提交
214

215 216
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

217
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SSDataBlock* pBlock) {
218
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
219

220
  pSupInfo->smaValid = true;
221
  pSupInfo->numOfCols = numOfCols;
222
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
223
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
224 225 226
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
227 228
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
229

H
Haojun Liao 已提交
230 231
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
232
    pSupInfo->colIds[i] = pCol->info.colId;
233 234 235 236

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
237
  }
H
Hongze Cheng 已提交
238

H
Haojun Liao 已提交
239 240
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
241

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
  int32_t i = 0, j = 0;

  while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
    STColumn* pTCol = &pSchema->columns[i];
    if (pTCol->colId == pSupInfo->colIds[j]) {
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
        return;
      }

      i += 1;
      j += 1;
    } else if (pTCol->colId < pSupInfo->colIds[j]) {
      // do nothing
      i += 1;
    } else {
      ASSERT(0);
    }
  }
}

264
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
265
  int32_t num = numOfTables / pBuf->numPerBucket;
266 267 268 269 270
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
271
  for (int32_t i = 0; i < num; ++i) {
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
293
  for (int32_t i = 0; i < num; ++i) {
294 295 296 297 298 299 300 301 302
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
303
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
304 305 306 307
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Haojun Liao 已提交
308
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
309
  // allocate buffer in order to load data blocks from file
310
  // todo use simple hash instead, optimize the memory consumption
311 312 313
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
314 315 316
    return NULL;
  }

H
Haojun Liao 已提交
317
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
318
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
319

320
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
321
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
322 323 324 325 326 327 328 329 330 331
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
332 333
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
334 335
  }

H
Haojun Liao 已提交
336 337 338 339
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
340

341
  return pTableMap;
H
Hongze Cheng 已提交
342
}
H
Hongze Cheng 已提交
343

344 345
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
346
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
347
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
348 349

    pInfo->iterInit = false;
H
Haojun Liao 已提交
350
    pInfo->iter.hasVal = false;
351
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
352

353 354
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
355 356
    }

H
Haojun Liao 已提交
357 358 359 360
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
    }

361 362
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
363 364 365
  }
}

366 367
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
368 369

  p->iter.hasVal = false;
370
  p->iiter.hasVal = false;
371

372 373 374
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
375

376 377 378
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
379

380 381 382 383
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
384

H
Haojun Liao 已提交
385
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
386
  void* p = NULL;
H
Haojun Liao 已提交
387
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
388
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
389 390 391 392 393
  }

  taosHashCleanup(pTableMap);
}

394
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
395 396
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
397
}
H
Hongze Cheng 已提交
398

399 400 401
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
402
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
403

404
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
405
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
406

dengyihao's avatar
dengyihao 已提交
407
  STimeWindow win = *pWindow;
408 409 410 411 412 413
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
414

H
Haojun Liao 已提交
415
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
416 417 418 419 420 421
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
422 423 424
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
425 426 427 428
  }
}

// init file iterator
429
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
430
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
431

432 433
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
434
  pIter->pFileList = aDFileSet;
435
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
436

437 438 439 440
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
441
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
442 443
      return code;
    }
444 445
  }

446 447 448 449 450 451 452 453
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

454
  if (pLReader->pInfo == NULL) {
455
    // here we ignore the first column, which is always be the primary timestamp column
456 457
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
458 459 460 461
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
462 463
  }

464
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
465 466 467
  return TSDB_CODE_SUCCESS;
}

468
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
469 470
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
471 472 473
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
474 475 476
    return false;
  }

H
Haojun Liao 已提交
477 478 479
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

480 481
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
482
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
483

H
Haojun Liao 已提交
484 485
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
486

487
  while (1) {
H
Haojun Liao 已提交
488 489 490
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
491

492
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
493

494 495 496 497
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
498

499 500
    pReader->cost.headFileLoad += 1;

501 502 503 504 505 506 507 508 509 510 511 512
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
513 514 515
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
516 517
      continue;
    }
C
Cary Xu 已提交
518

519
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
520
              pReader->window.ekey, pReader->idStr);
521 522
    return true;
  }
523

H
Haojun Liao 已提交
524
_err:
H
Haojun Liao 已提交
525 526 527
  return false;
}

528
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
529 530
  pIter->order = order;
  pIter->index = -1;
531
  pIter->numOfBlocks = 0;
532 533 534 535 536 537 538
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
539
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
540

H
Haojun Liao 已提交
541
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
542 543
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
544 545
}

546 547 548 549 550 551 552 553
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
554
    SColumnInfoData colInfo = {0};
555 556 557 558 559 560 561 562 563 564 565 566 567
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

568 569
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
570
  int32_t      code = 0;
571
  int8_t       level = 0;
H
Haojun Liao 已提交
572
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
573 574
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
575
    goto _end;
H
Hongze Cheng 已提交
576 577
  }

C
Cary Xu 已提交
578 579 580 581
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
582
  initReaderStatus(&pReader->status);
583

L
Liu Jicong 已提交
584
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
585 586
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
587
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
588 589
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
590
  pReader->type = pCond->type;
591
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
592
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
593
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
594

595
  limitOutputBufferSize(pCond, &pReader->capacity);
596

597 598
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
599
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
600
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
601
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
602 603 604
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
605

606 607
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
608
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
609 610 611 612 613
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

614 615 616 617
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
618
  }
H
Hongze Cheng 已提交
619

620
  setColumnIdSlotList(&pReader->suppInfo, pReader->pResBlock);
621

H
Hongze Cheng 已提交
622 623
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
624

H
Haojun Liao 已提交
625
_end:
H
Haojun Liao 已提交
626
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
627 628 629
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
630

H
Haojun Liao 已提交
631
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
632
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
633

634
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
635
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
636
  if (code != TSDB_CODE_SUCCESS) {
637
    goto _end;
H
Haojun Liao 已提交
638
  }
H
Hongze Cheng 已提交
639

640 641
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
642
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
643 644
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
645

646 647 648 649
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
650
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
651

652
    // uid check
H
Hongze Cheng 已提交
653
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
654 655 656 657
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
658
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
659 660 661 662
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
663
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
664
    if (pScanInfo->pBlockList == NULL) {
665
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
666 667
    }

H
Hongze Cheng 已提交
668
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
669
  }
H
Hongze Cheng 已提交
670

671
  int64_t et2 = taosGetTimestampUs();
672
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
673
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
674 675 676

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
677
_end:
H
Hongze Cheng 已提交
678
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
679 680
  return code;
}
H
Hongze Cheng 已提交
681

682
static void cleanupTableScanInfo(SHashObj* pTableMap) {
683
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
684
  while (1) {
685
    px = taosHashIterate(pTableMap, px);
686 687 688 689
    if (px == NULL) {
      break;
    }

690
    // reset the index in last block when handing a new file
691 692
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
693
  }
694 695
}

696
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
697 698 699 700 701 702
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
703

dengyihao's avatar
dengyihao 已提交
704
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
705
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
706

H
Haojun Liao 已提交
707 708
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
709

710
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
711
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
712
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
713

714
    sizeInDisk += pScanInfo->mapData.nData;
H
Haojun Liao 已提交
715

H
Haojun Liao 已提交
716
    SDataBlk block = {0};
717
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
718
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
719

720
      // 1. time range check
H
Haojun Liao 已提交
721
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
722 723
        continue;
      }
H
Hongze Cheng 已提交
724

725
      // 2. version range check
H
Haojun Liao 已提交
726
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
727 728
        continue;
      }
729

H
Haojun Liao 已提交
730 731
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
732

H
Haojun Liao 已提交
733 734
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
735
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
736 737
        return TSDB_CODE_OUT_OF_MEMORY;
      }
738

739
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
740
    }
H
Hongze Cheng 已提交
741

H
Haojun Liao 已提交
742
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
743 744 745 746
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
747
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
748
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
749

750
  double el = (taosGetTimestampUs() - st) / 1000.0;
751 752 753 754 755
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
756

H
Haojun Liao 已提交
757

758
  pReader->cost.numOfBlocks += total;
759
  pReader->cost.headFileLoadTime += el;
760

H
Haojun Liao 已提交
761 762
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
763

764
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
765
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
766
  pDumpInfo->allDumped = true;
767
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
768 769
}

770 771
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
772
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
773
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
774 775 776
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
777
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
778 779 780 781
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
782 783 784
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
785
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
786
  }
H
Haojun Liao 已提交
787 788
}

789
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
790 791
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
792 793
    return NULL;
  }
794 795 796

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
797 798
}

H
Hongze Cheng 已提交
799
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
800

H
Haojun Liao 已提交
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
874
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
875 876 877 878
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
879 880
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
881 882
    while (1) {
      // check can return
H
Hongze Cheng 已提交
883 884 885
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
886 887

      // change start or end position
H
Hongze Cheng 已提交
888
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
889 890
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
891
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
892 893 894 895
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
896
  } else {  // DESC
H
Haojun Liao 已提交
897
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
898 899
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
900 901
    while (1) {
      // check can return
H
Hongze Cheng 已提交
902 903 904
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
905 906

      // change start or end position
H
Hongze Cheng 已提交
907
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
908 909
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
910
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
911 912 913 914 915 916 917
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
918
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
H
Haojun Liao 已提交
919 920
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
921
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
922 923 924 925 926 927

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
928 929
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
930 931 932 933 934
  }

  return endPos;
}

H
Haojun Liao 已提交
935
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
                            int32_t dumpedRows, bool asc)  {
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

  int32_t step = asc? 1:-1;

  // make sure it is aligned to 8bit
  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
    switch(pColData->info.type) {
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
      case TSDB_DATA_TYPE_UBIGINT:
      {
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
        int32_t  mid = dumpedRows >> 1u;
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
998
          int8_t t = pts[j];
H
Haojun Liao 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1023
          int32_t t = pts[j];
H
Haojun Liao 已提交
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1046
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
1047
  SReaderStatus*  pStatus = &pReader->status;
1048
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
1049

1050
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
1051
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1052
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1053
  SSDataBlock*        pResBlock = pReader->pResBlock;
1054
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
1055

H
Haojun Liao 已提交
1056
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
1057
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
1058

H
Haojun Liao 已提交
1059
  SColVal cv = {0};
1060
  int64_t st = taosGetTimestampUs();
1061 1062
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1063

1064 1065
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1066 1067 1068
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
1069 1070
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1071 1072 1073
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
1074
    }
H
Haojun Liao 已提交
1075 1076 1077
  }

  // time window check
1078 1079 1080 1081 1082 1083 1084
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1085 1086 1087
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1088 1089
  }

H
Haojun Liao 已提交
1090
  int32_t i = 0;
H
Haojun Liao 已提交
1091 1092
  int32_t rowIndex = 0;

1093 1094
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1095
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1096 1097 1098
    i += 1;
  }

1099
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1100
  int32_t num = pBlockData->nColData;
1101
  while (i < numOfOutputCols && colIndex < num) {
1102 1103 1104
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
1105
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
1106 1107 1108
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
1109
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1110
        colDataAppendNNULL(pColData, 0, dumpedRows);
H
Haojun Liao 已提交
1111
      } else {
H
Haojun Liao 已提交
1112
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1113 1114
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1115
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
H
Haojun Liao 已提交
1116 1117 1118 1119
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1120
      }
H
Haojun Liao 已提交
1121

1122
      colIndex += 1;
1123
      i += 1;
1124
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1125
      colDataAppendNNULL(pColData, 0, dumpedRows);
1126
      i += 1;
H
Haojun Liao 已提交
1127
    }
1128 1129
  }

1130
  // fill the mis-matched columns with null value
1131
  while (i < numOfOutputCols) {
1132
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
H
Haojun Liao 已提交
1133
    colDataAppendNNULL(pColData, 0, dumpedRows);
1134
    i += 1;
H
Haojun Liao 已提交
1135
  }
H
Haojun Liao 已提交
1136

H
Haojun Liao 已提交
1137 1138
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1139

1140
  // check if current block are all handled
1141
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1142 1143 1144 1145
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1146
  } else {
1147 1148
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1149
  }
H
Haojun Liao 已提交
1150

1151
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1152
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1153

1154
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1155
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1156
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1157
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Hongze Cheng 已提交
1158
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1159 1160 1161 1162

  return TSDB_CODE_SUCCESS;
}

1163 1164
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1165 1166
  int64_t st = taosGetTimestampUs();

1167 1168
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1169 1170
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
1171 1172 1173 1174
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1175
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1176
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1177
  ASSERT(pBlockInfo != NULL);
1178

H
Hongze Cheng 已提交
1179
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1180
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1181 1182
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1183
              ", rows:%d, code:%s %s",
1184
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1185 1186 1187
              tstrerror(code), pReader->idStr);
    return code;
  }
1188

1189
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1190

1191
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1192
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1193 1194
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1195 1196 1197

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1198

H
Haojun Liao 已提交
1199
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1200
}
H
Hongze Cheng 已提交
1201

H
Haojun Liao 已提交
1202 1203 1204
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1205

H
Haojun Liao 已提交
1206 1207 1208 1209
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1210

H
Haojun Liao 已提交
1211 1212
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1213

H
Haojun Liao 已提交
1214 1215
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1216

H
Haojun Liao 已提交
1217
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1218 1219
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1220

H
Haojun Liao 已提交
1221 1222 1223 1224
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1225

H
Haojun Liao 已提交
1226 1227
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1228

H
Haojun Liao 已提交
1229
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1230
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1231
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1232

H
Haojun Liao 已提交
1233
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1234

H
Haojun Liao 已提交
1235 1236
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1237

H
Haojun Liao 已提交
1238 1239 1240 1241 1242 1243 1244
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1245

1246
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1247
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1248

1249 1250 1251
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1252
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1253 1254
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1255
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1256
    if (pScanInfo == NULL) {
1257
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1258 1259 1260
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1261 1262
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1263
  }
1264 1265 1266 1267 1268 1269

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1270
}
H
Hongze Cheng 已提交
1271

1272
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1273
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1274

1275
  SBlockOrderSupporter sup = {0};
1276
  pBlockIter->numOfBlocks = numOfBlocks;
1277
  taosArrayClear(pBlockIter->blockList);
1278
  pBlockIter->pTableMap = pReader->status.pTableMap;
1279

1280 1281
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1282

1283
  int64_t st = taosGetTimestampUs();
1284
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1285 1286 1287
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1288

1289 1290 1291 1292 1293 1294 1295
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1296

1297
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1298 1299 1300
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1301

1302 1303
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1304

1305 1306 1307 1308 1309
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1310

1311
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1312

1313 1314 1315
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1316
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1317 1318 1319 1320 1321
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1322

1323
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1324

1325
  // since there is only one table qualified, blocks are not sorted
1326 1327
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1328 1329
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1330
    }
1331

1332
    int64_t et = taosGetTimestampUs();
1333
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1334
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1335

1336
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1337
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1338
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1339
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1340
  }
H
Haojun Liao 已提交
1341

1342 1343
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1344

1345
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1346

1347 1348 1349 1350 1351
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1352
  }
H
Haojun Liao 已提交
1353

1354 1355 1356 1357
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1358

1359 1360
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1361

1362 1363 1364 1365
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1366

1367 1368
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1369
  }
H
Haojun Liao 已提交
1370

1371
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1372 1373
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1374 1375
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1376

1377
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1378
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1379

1380
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1381
}
H
Hongze Cheng 已提交
1382

H
Haojun Liao 已提交
1383
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1384 1385
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1386
  int32_t step = asc ? 1 : -1;
1387
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1388 1389 1390
    return false;
  }

1391
  pBlockIter->index += step;
H
Haojun Liao 已提交
1392
  doSetCurrentBlock(pBlockIter, idStr);
1393

1394 1395 1396
  return true;
}

1397 1398 1399
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1400
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1401 1402
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1403 1404
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1405
}
H
Hongze Cheng 已提交
1406

1407
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1408
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1409
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1410
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1411
    return false;
1412 1413
  }

H
Haojun Liao 已提交
1414
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1415
    return false;
1416 1417
  }

1418
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1419
  *nextIndex = pBlockInfo->tbBlockIdx + step;
H
Hongze Cheng 已提交
1420 1421
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1422
  return true;
1423 1424 1425 1426 1427
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1428
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1429 1430
  int32_t index = pBlockIter->index;

1431
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1444
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1445
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1446 1447 1448 1449
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1450 1451 1452 1453 1454
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1455

1456 1457 1458
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1459

H
Haojun Liao 已提交
1460
  doSetCurrentBlock(pBlockIter, "");
1461 1462 1463
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1464
// todo: this attribute could be acquired during extractin the global ordered block list.
1465
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1466 1467
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1468
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1469
  } else {
1470
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1471
  }
H
Haojun Liao 已提交
1472
}
H
Hongze Cheng 已提交
1473

H
Hongze Cheng 已提交
1474
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1475
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1476

1477
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1478
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1479
}
H
Hongze Cheng 已提交
1480

H
Hongze Cheng 已提交
1481
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1482 1483
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1484 1485
}

H
Hongze Cheng 已提交
1486 1487
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1488 1489
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1490
  for (int32_t i = startIndex; i < num; i += 1) {
1491 1492
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1493
      if (p->version >= pBlock->minVer) {
1494 1495 1496
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1497
      if (p->version >= pBlock->minVer) {
1498 1499
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1500 1501
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1515
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1516 1517 1518 1519
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1520
  // ts is not overlap
1521
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1522
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1523 1524 1525 1526 1527
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1528
  if (ASCENDING_TRAVERSE(order)) {
1529
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1530 1531
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1532
    while (1) {
1533 1534 1535 1536 1537
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1538 1539 1540
      }
    }

1541
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1542
  }
1543 1544
}

H
Haojun Liao 已提交
1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1558 1559
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1560

1561
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1562

1563
  // overlap with neighbor
1564
  if (hasNeighbor) {
1565
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1566 1567
  }

1568
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1569 1570
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1571

H
Haojun Liao 已提交
1572 1573 1574
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1575 1576
  }

H
Haojun Liao 已提交
1577 1578 1579 1580
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1581

H
Haojun Liao 已提交
1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1596 1597 1598 1599

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1600 1601
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1602 1603 1604
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1605 1606 1607
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1608 1609
}

H
Haojun Liao 已提交
1610 1611 1612 1613 1614 1615 1616 1617 1618
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1619
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1620
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1621 1622
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1623

1624 1625 1626
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1627
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1628

1629
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1630
  pBlock->info.id.uid = pBlockScanInfo->uid;
1631

1632
  setComposedBlockFlag(pReader, true);
1633

1634
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1635
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
H
Hongze Cheng 已提交
1636
            " - %" PRId64 " %s",
1637 1638
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1639 1640

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1641 1642 1643
  return code;
}

1644 1645
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1646 1647 1648
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1649 1650
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1651
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1652 1653

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1654
    if (nextKey != key) {  // merge is not needed
1655
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1656 1657 1658 1659 1660 1661 1662 1663
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1664 1665
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1666 1667 1668 1669 1670 1671 1672 1673
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1674 1675
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1676 1677 1678 1679 1680 1681 1682
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1683
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1698 1699 1700
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1701
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1702 1703
  }

1704
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1705 1706 1707 1708 1709 1710
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1711 1712 1713 1714 1715 1716
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1717 1718 1719 1720 1721 1722
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1723
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1724
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1725
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1726 1727
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1728 1729
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1730
  }
H
Haojun Liao 已提交
1731 1732
}

1733
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1734 1735 1736 1737 1738 1739
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1740
  int64_t tsLast = INT64_MIN;
1741
  if (hasDataInLastBlock(pLastBlockReader)) {
1742 1743
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1744

H
Hongze Cheng 已提交
1745 1746
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1747

1748 1749
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1750
    minKey = INT64_MAX;  // chosen the minimum value
1751
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1752 1753
      minKey = tsLast;
    }
1754

1755 1756 1757
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1758

1759
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1760 1761 1762 1763
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1764
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1765 1766 1767 1768 1769 1770 1771
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1772
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1773 1774
      minKey = key;
    }
1775 1776 1777 1778
  }

  bool init = false;

1779
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1780
  // DESC: mem -----> imem -----> last block -----> file block
1781 1782
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1783
      init = true;
H
Haojun Liao 已提交
1784 1785 1786 1787
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1788
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1789 1790
    }

1791
    if (minKey == tsLast) {
1792
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1793 1794 1795
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1796
        init = true;
H
Haojun Liao 已提交
1797 1798 1799 1800
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1801
      }
1802
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1803
    }
1804

1805
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1806 1807 1808
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1809 1810
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1811 1812 1813 1814 1815 1816 1817 1818
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1819 1820 1821 1822 1823
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1824
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1825 1826 1827 1828 1829 1830
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1831
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1832 1833
        return code;
      }
1834 1835
    }

1836
    if (minKey == tsLast) {
1837
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1838 1839 1840
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1841
        init = true;
H
Haojun Liao 已提交
1842 1843 1844 1845
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1846
      }
1847
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1848 1849 1850
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1851 1852 1853
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1854
        init = true;
H
Haojun Liao 已提交
1855 1856 1857 1858
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1859 1860 1861
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1862 1863
  }

1864 1865 1866 1867 1868
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1869
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1870 1871 1872 1873 1874 1875

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1876 1877 1878
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1879
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1880
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1881 1882 1883

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1884
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1885
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1886

1887 1888 1889
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1890
      pBlockScanInfo->lastKey = tsLastBlock;
1891 1892
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1893 1894 1895 1896
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1897 1898 1899

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1900
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1901

H
Haojun Liao 已提交
1902
      code = tRowMergerGetRow(&merge, &pTSRow);
1903 1904 1905
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1906

1907
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1908 1909 1910 1911 1912

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1913 1914 1915 1916 1917
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1918
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1919
    ASSERT(mergeBlockData);
1920 1921

    // merge with block data if ts == key
H
Haojun Liao 已提交
1922
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1923 1924 1925
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1926
    code = tRowMergerGetRow(&merge, &pTSRow);
1927 1928 1929 1930
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1931
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1932 1933 1934 1935

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1936 1937 1938 1939

  return TSDB_CODE_SUCCESS;
}

1940 1941
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1942 1943
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1944
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1945
    // no last block available, only data block exists
1946
    if (!hasDataInLastBlock(pLastBlockReader)) {
1947 1948 1949 1950 1951 1952 1953 1954 1955
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1956
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1957 1958 1959 1960
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1961

H
Haojun Liao 已提交
1962 1963 1964 1965 1966
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1967
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1968 1969 1970 1971

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1972
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1973

H
Haojun Liao 已提交
1974
        code = tRowMergerGetRow(&merge, &pTSRow);
1975 1976 1977 1978
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1979
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1980

1981 1982
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1983
        return code;
1984
      } else {
1985 1986
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1987
      }
1988
    } else {  // desc order
1989
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1990
    }
1991
  } else {  // only last block exists
1992
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1993
  }
1994 1995
}

1996 1997
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1998 1999 2000
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
2001 2002 2003
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2004 2005
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2006 2007
  ASSERT(pRow != NULL && piRow != NULL);

2008
  int64_t tsLast = INT64_MIN;
2009 2010 2011
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2012

H
Hongze Cheng 已提交
2013
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2014 2015 2016 2017

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2018
  int64_t minKey = 0;
2019 2020 2021 2022 2023
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2024

2025 2026 2027
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2028

2029
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2030 2031
      minKey = key;
    }
2032

2033
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2034 2035 2036
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2037
    minKey = INT64_MIN;  // let find the maximum ts value
2038 2039 2040 2041 2042 2043 2044 2045
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2046
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2047 2048 2049
      minKey = key;
    }

2050
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2051 2052
      minKey = tsLast;
    }
2053 2054 2055 2056
  }

  bool init = false;

2057 2058 2059 2060
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2061
      init = true;
2062
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2063
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2064 2065 2066 2067
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2068
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2069 2070
    }

2071
    if (minKey == tsLast) {
2072
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2073 2074 2075
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2076
        init = true;
2077
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2078 2079 2080
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2081
      }
H
Haojun Liao 已提交
2082

2083
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2084 2085 2086
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2087 2088 2089
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2090 2091
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2092 2093 2094 2095
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2096
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2097 2098 2099
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2100
      }
H
Haojun Liao 已提交
2101

2102 2103
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2104 2105 2106
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2107 2108
    }

2109
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2110
      if (init) {
2111 2112 2113 2114
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2115 2116
        tRowMerge(&merge, pRow);
      } else {
2117
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2118
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2119 2120 2121
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2122
      }
H
Haojun Liao 已提交
2123 2124 2125 2126 2127
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2128 2129 2130 2131
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2132
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2133
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2134 2135 2136 2137
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2138 2139 2140 2141 2142
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2143 2144 2145
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2146 2147 2148
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2149 2150
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2151
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2152 2153 2154
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2155
      }
H
Haojun Liao 已提交
2156 2157 2158 2159 2160
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2161 2162 2163
    }

    if (minKey == tsLast) {
2164
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2165 2166 2167
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2168
        init = true;
2169
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2170 2171 2172
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2173
      }
2174
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2175 2176 2177
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2178
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2179
      if (!init) {
2180
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2181 2182 2183
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2184
      } else {
2185 2186 2187
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2188
        tRowMerge(&merge, &fRow);
2189 2190
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2191 2192 2193
    }
  }

2194
  if (merge.pTSchema == NULL) {
2195 2196 2197
    return code;
  }

H
Haojun Liao 已提交
2198
  code = tRowMergerGetRow(&merge, &pTSRow);
2199 2200 2201 2202
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2203
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2204 2205 2206

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2207
  return code;
2208 2209
}

2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
H
Haojun Liao 已提交
2230
      ASSERT(pBlockScanInfo->iter.iter == NULL);
2231 2232 2233 2234 2235
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2236
                  "-%" PRId64 " %s",
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2257
                  "-%" PRId64 " %s",
2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2275 2276
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2277 2278 2279 2280 2281 2282 2283 2284
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2296
  TSDBKEY k = {.ts = ts, .version = ver};
2297 2298
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2299 2300 2301
    return false;
  }

2302 2303 2304
  return true;
}

2305
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2306
  // the last block reader has been initialized for this table.
2307
  if (pLBlockReader->uid == pScanInfo->uid) {
2308
    return hasDataInLastBlock(pLBlockReader);
2309 2310
  }

2311 2312
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2313 2314
  }

2315 2316
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2317

H
Hongze Cheng 已提交
2318
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2319 2320 2321
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2322
  } else {
2323
    w.ekey = pScanInfo->lastKey + step;
2324 2325
  }

2326 2327 2328
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2329 2330 2331 2332
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2333
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2334 2335
}

2336
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2337
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2338
  return TSDBROW_TS(&row);
2339 2340
}

H
Hongze Cheng 已提交
2341
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2342 2343 2344 2345

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2346 2347
  }

2348
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2349
}
2350

2351 2352
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2353 2354
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2355
    pBlockScanInfo->lastKey = key;
2356 2357
    return TSDB_CODE_SUCCESS;
  } else {
2358 2359
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2360 2361 2362
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2363 2364 2365 2366 2367
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2368
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2369
    code = tRowMergerGetRow(&merge, &pTSRow);
2370 2371 2372 2373
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2374
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2375 2376 2377 2378 2379 2380 2381

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2382 2383
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2384 2385
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2386
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2387
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2388
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2389
  } else {
2390 2391 2392 2393 2394 2395 2396 2397 2398
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2399
    // imem + file + last block
2400
    if (pBlockScanInfo->iiter.hasVal) {
2401
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2402 2403
    }

2404
    // mem + file + last block
2405
    if (pBlockScanInfo->iter.hasVal) {
2406
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2407
    }
2408

2409 2410
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2411 2412 2413
  }
}

2414
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2415 2416
  int32_t code = TSDB_CODE_SUCCESS;

2417 2418
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2419
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2420 2421
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2422
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2423
  int64_t st = taosGetTimestampUs();
2424
  int32_t step = asc ? 1 : -1;
2425 2426 2427

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2428 2429
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2430
      code = TSDB_CODE_INVALID_PARA;
2431 2432
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2433 2434 2435
      goto _end;
    }

H
Hongze Cheng 已提交
2436
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2437

H
Haojun Liao 已提交
2438
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2439
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2440 2441

    // it is a clean block, load it directly
2442 2443
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && 
        pBlock->nRow <= pReader->capacity) {
2444
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2445
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2446 2447

        // record the last key value
H
Hongze Cheng 已提交
2448
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2449 2450
        goto _end;
      }
H
Haojun Liao 已提交
2451 2452
    }
  } else {  // file blocks not exist
2453
    pBlockScanInfo = *pReader->status.pTableIter;
2454 2455
  }

2456
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2457
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2458

2459
  while (1) {
2460
    bool hasBlockData = false;
2461
    {
H
Haojun Liao 已提交
2462
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2463 2464 2465 2466 2467
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2468 2469
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2470
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2471
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2472
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2473 2474 2475
          break;
        }
      }
2476
    }
2477

2478
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2479

2480 2481 2482
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2483 2484
    }

2485
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2486

2487
    // currently loaded file data block is consumed
2488
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2489
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2490
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2491 2492 2493 2494 2495
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2496 2497 2498
    }
  }

H
Haojun Liao 已提交
2499
_end:
H
Haojun Liao 已提交
2500
  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2501 2502
  blockDataUpdateTsWindow(pResBlock, 0);

2503
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2504
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2505 2506 2507

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2508

2509 2510
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Hongze Cheng 已提交
2511
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2512
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2513
              pResBlock->info.rows, el, pReader->idStr);
2514
  }
2515

H
Haojun Liao 已提交
2516
  return code;
2517 2518 2519 2520
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2521 2522
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2523 2524 2525
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2526

2527 2528 2529
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2530 2531
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2532
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2533 2534
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2535
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2536
    if (code != TSDB_CODE_SUCCESS) {
2537 2538 2539 2540 2541
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2542
      tsdbDelFReaderClose(&pDelFReader);
2543 2544 2545
      goto _err;
    }

H
Haojun Liao 已提交
2546
    // TODO: opt the perf of read del index
H
Hongze Cheng 已提交
2547
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2548 2549 2550
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2551 2552
      goto _err;
    }
2553

2554 2555 2556
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2557
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2558
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2559 2560 2561 2562 2563 2564 2565
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2566
    }
2567
  }
2568

2569 2570 2571 2572 2573 2574 2575
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2576 2577
  }

2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2592
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2593
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2594 2595
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2596
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2597 2598
  return code;

H
Haojun Liao 已提交
2599
_err:
2600 2601
  taosArrayDestroy(pDelData);
  return code;
2602 2603
}

H
Haojun Liao 已提交
2604
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2605
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2606
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2607
  if (pRow != NULL) {
2608 2609 2610
    key = TSDBROW_KEY(pRow);
  }

2611
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2612
  if (pRow != NULL) {
2613 2614 2615 2616 2617 2618 2619 2620 2621
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2622
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2623
  SReaderStatus* pStatus = &pReader->status;
2624
  pBlockNum->numOfBlocks = 0;
2625
  pBlockNum->numOfLastFiles = 0;
2626

2627
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2628
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2629 2630

  while (1) {
2631
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2632
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2633 2634 2635
      break;
    }

H
Haojun Liao 已提交
2636
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2637 2638
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2639
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2640 2641 2642
      return code;
    }

H
Hongze Cheng 已提交
2643
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2644
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2645
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2646
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2647 2648 2649
        return code;
      }

2650
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2651 2652 2653
        break;
      }
    }
2654

H
Haojun Liao 已提交
2655 2656 2657
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2658
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2659 2660 2661
  return TSDB_CODE_SUCCESS;
}

2662
static int32_t uidComparFunc(const void* p1, const void* p2) {
2663 2664
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2665 2666 2667
  if (pu1 == pu2) {
    return 0;
  } else {
2668
    return (pu1 < pu2) ? -1 : 1;
2669 2670
  }
}
2671

2672
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2673 2674 2675 2676
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2677
  while (p != NULL) {
H
Hongze Cheng 已提交
2678
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
2679 2680 2681 2682 2683 2684 2685
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2686
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2687 2688 2689 2690
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2691

2692
  if (pOrderCheckInfo->tableUidList == NULL) {
2693 2694 2695 2696 2697 2698
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2699
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2700 2701 2702
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2703 2704
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2705 2706
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2707 2708

      // the tableMap has already updated
2709
      if (pStatus->pTableIter == NULL) {
2710
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2711 2712 2713 2714 2715 2716 2717 2718 2719
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2720
      }
2721
    }
2722
  }
2723

2724 2725 2726
  return TSDB_CODE_SUCCESS;
}

2727
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2740
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2741
  SReaderStatus*    pStatus = &pReader->status;
2742 2743
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2744 2745
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2746
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2747 2748
    return code;
  }
2749

2750
  while (1) {
2751
    // load the last data block of current table
H
Hongze Cheng 已提交
2752
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
H
Hongze Cheng 已提交
2753
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2754
    if (!hasVal) {
2755 2756
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2757 2758 2759
        return TSDB_CODE_SUCCESS;
      }
      continue;
2760 2761
    }

2762 2763 2764 2765 2766 2767 2768 2769
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2770

2771
    // current table is exhausted, let's try next table
2772 2773
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2774 2775
      return TSDB_CODE_SUCCESS;
    }
2776 2777 2778
  }
}

2779
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2780 2781
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2782 2783 2784

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2785 2786 2787
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2788

2789
  if (pBlockInfo != NULL) {
H
Hongze Cheng 已提交
2790 2791
    pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2792
  } else {
2793
    pScanInfo = *pReader->status.pTableIter;
2794 2795
  }

H
Haojun Liao 已提交
2796
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2797
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2798 2799 2800 2801
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2802
  if (pBlockInfo != NULL) {
2803
    pBlock = getCurrentBlock(pBlockIter);
2804 2805
  }

2806
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2807
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2808

2809 2810 2811
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2812
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2813
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2814 2815
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2816 2817 2818
    }

    // build composed data block
2819
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2820
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2821
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2822
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2823
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2824
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2825 2826 2827 2828
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2829
      ASSERT(tsLast >= pBlock->maxKey.ts);
2830 2831
      tBlockDataReset(&pReader->status.fileBlockData);

2832
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2833
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2834
    } else {  // whole block is required, return it directly
2835 2836
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
2837
      pInfo->id.uid = pScanInfo->uid;
2838 2839 2840
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2841

2842
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
2843
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
2844
    }
2845 2846 2847 2848 2849
  }

  return code;
}

H
Haojun Liao 已提交
2850
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2851 2852
  SReaderStatus* pStatus = &pReader->status;

2853
  while (1) {
2854 2855 2856
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2857
        return TSDB_CODE_SUCCESS;
2858 2859 2860
      }
    }

2861 2862
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2863

2864
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2865
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2866 2867 2868 2869
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2870
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2871
      return TSDB_CODE_SUCCESS;
2872 2873 2874 2875 2876
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2877
      return TSDB_CODE_SUCCESS;
2878 2879 2880 2881
    }
  }
}

2882
// set the correct start position in case of the first/last file block, according to the query time window
2883
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2884
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2885

2886 2887 2888
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2889 2890 2891

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2892
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2893 2894
}

2895
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2896 2897
  SBlockNumber num = {0};

2898
  int32_t code = moveToNextFile(pReader, &num);
2899 2900 2901 2902 2903
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2904
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2905 2906 2907 2908 2909
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2910 2911
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2912
  } else {  // no block data, only last block exists
2913
    tBlockDataReset(&pReader->status.fileBlockData);
2914
    resetDataBlockIterator(pBlockIter, pReader->order);
2915
  }
2916 2917

  // set the correct start position according to the query time window
2918
  initBlockDumpInfo(pReader, pBlockIter);
2919 2920 2921
  return code;
}

2922
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2923 2924
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2925 2926
}

2927
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2928
  int32_t code = TSDB_CODE_SUCCESS;
2929 2930
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2931 2932
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2933
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2934
  _begin:
2935 2936 2937 2938 2939
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2940 2941 2942 2943
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2944
    // all data blocks are checked in this last block file, now let's try the next file
2945 2946 2947 2948 2949 2950 2951 2952
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2953
      // this file does not have data files, let's start check the last block file if exists
2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2969
  while (1) {
2970 2971
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2972
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2973
      code = buildComposedDataBlock(pReader);
2974 2975 2976 2977
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2978
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2979 2980
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2981
        } else {
H
Haojun Liao 已提交
2982
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2983 2984 2985 2986 2987 2988
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2989

2990 2991 2992 2993
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2994

2995 2996 2997 2998
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2999
          }
3000
        }
H
Haojun Liao 已提交
3001
      }
3002 3003

      code = doBuildDataBlock(pReader);
3004 3005
    }

3006 3007 3008 3009 3010 3011 3012 3013
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3014
}
H
refact  
Hongze Cheng 已提交
3015

3016 3017
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3018
  if (VND_IS_RSMA(pVnode)) {
3019
    int8_t  level = 0;
3020 3021
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3022
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
H
Hongze Cheng 已提交
3023 3024
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3025

3026
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3027 3028 3029 3030 3031 3032 3033
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3034
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3035 3036 3037 3038 3039
        break;
      }
      ++level;
    }

3040
    const char* str = (idStr != NULL) ? idStr : "";
3041 3042

    if (level == TSDB_RETENTION_L0) {
3043
      *pLevel = TSDB_RETENTION_L0;
K
kailixu 已提交
3044
      tsdbInfo("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3045 3046
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3047
      *pLevel = TSDB_RETENTION_L1;
K
kailixu 已提交
3048
      tsdbInfo("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3049 3050
      return VND_RSMA1(pVnode);
    } else {
3051
      *pLevel = TSDB_RETENTION_L2;
K
kailixu 已提交
3052
      tsdbInfo("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3053 3054 3055 3056 3057 3058 3059
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3060
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3061
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3062 3063

  int64_t endVer = 0;
L
Liu Jicong 已提交
3064 3065
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3066 3067
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3068
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3069 3070
  }

H
Haojun Liao 已提交
3071
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3072 3073
}

3074
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3075 3076 3077 3078
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
3079 3080 3081
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3082

3083 3084 3085 3086 3087 3088
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3089
        return false;
3090 3091 3092
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
3093 3094
      }
    } else {
3095 3096 3097 3098 3099 3100 3101
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3102 3103
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3119 3120
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3121 3122 3123 3124 3125 3126
            return true;
          }
        }
      }

      return false;
3127 3128
    }
  } else {
3129 3130
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3131

3132 3133 3134 3135 3136 3137 3138
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3139
    } else {
3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3167 3168 3169 3170 3171
      }

      return false;
    }
  }
3172 3173

  return false;
3174 3175
}

3176
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3177
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3178 3179
    return NULL;
  }
H
Hongze Cheng 已提交
3180

3181
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3182
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3183
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3184
    pIter->hasVal = false;
H
Haojun Liao 已提交
3185 3186
    return NULL;
  }
H
Hongze Cheng 已提交
3187

3188
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3189
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3190
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3191 3192
    return pRow;
  }
H
Hongze Cheng 已提交
3193

3194
  while (1) {
3195 3196
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3197 3198
      return NULL;
    }
H
Hongze Cheng 已提交
3199

3200
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3201

H
Haojun Liao 已提交
3202
    key = TSDBROW_KEY(pRow);
3203
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3204
      pIter->hasVal = false;
H
Haojun Liao 已提交
3205 3206
      return NULL;
    }
H
Hongze Cheng 已提交
3207

dengyihao's avatar
dengyihao 已提交
3208
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3209
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3210 3211 3212 3213
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3214

3215 3216
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3217
  while (1) {
3218 3219
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3220 3221
      break;
    }
H
Hongze Cheng 已提交
3222

3223
    // data exists but not valid
3224
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3225 3226 3227 3228 3229
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3230
    TSDBKEY k = TSDBROW_KEY(pRow);
3231
    if (k.ts != ts) {
H
Haojun Liao 已提交
3232 3233 3234
      break;
    }

H
Haojun Liao 已提交
3235
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3236 3237 3238 3239
    if (pTSchema == NULL) {
      return terrno;
    }

3240
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3241 3242 3243 3244 3245
  }

  return TSDB_CODE_SUCCESS;
}

3246
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3247
                                          SVersionRange* pVerRange, int32_t step) {
3248
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3249
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3250
      rowIndex += step;
3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3267
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3268 3269
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3270
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3271
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3272

3273
  *state = CHECK_FILEBLOCK_QUIT;
3274
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3275

3276 3277 3278 3279
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3280
  if (!hasNeighbor) {  // do nothing
3281 3282 3283
    return 0;
  }

3284
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3285
  if (overlap) {  // load next block
3286
    SReaderStatus*  pStatus = &pReader->status;
3287 3288
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3289
    // 1. find the next neighbor block in the scan block list
3290
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3291
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3292

3293
    // 2. remove it from the scan block list
3294
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3295

3296
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3297
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3298 3299 3300 3301
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3302
    // 4. check the data values
3303 3304 3305 3306
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3307
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3308 3309 3310 3311 3312 3313 3314
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3315 3316
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3317 3318
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3319
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3320
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3321
  int32_t step = asc ? 1 : -1;
3322

3323
  pDumpInfo->rowIndex += step;
3324
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3325 3326 3327
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3328

3329 3330 3331 3332
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3333

3334
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3335
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3336 3337 3338
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3339
      }
3340
    }
H
Haojun Liao 已提交
3341
  }
3342

H
Haojun Liao 已提交
3343 3344 3345
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3346
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3347 3348
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3349 3350
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3351
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3352 3353 3354 3355 3356 3357 3358 3359 3360
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3361 3362
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3363
  TSDBROW* pNextRow = NULL;
3364
  TSDBROW  current = *pRow;
3365

3366 3367
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3368

3369 3370 3371
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3372
      return TSDB_CODE_SUCCESS;
3373
    } else {  // has next point in mem/imem
3374
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3375 3376 3377
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3378
        return TSDB_CODE_SUCCESS;
3379 3380
      }

H
Haojun Liao 已提交
3381
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3382 3383
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3384
        return TSDB_CODE_SUCCESS;
3385
      }
3386
    }
3387 3388
  }

3389 3390
  SRowMerger merge = {0};

3391
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3392
  terrno = 0;
H
Haojun Liao 已提交
3393
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3394 3395 3396
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3397

3398 3399
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3400
  }
H
Haojun Liao 已提交
3401

H
Haojun Liao 已提交
3402 3403 3404 3405
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3406 3407

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3408
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3409 3410 3411
    return terrno;
  }

H
Haojun Liao 已提交
3412 3413
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3414 3415 3416 3417 3418
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3419
  code = tRowMergerGetRow(&merge, pTSRow);
3420 3421 3422
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3423

3424
  tRowMergerClear(&merge);
3425
  *freeTSRow = true;
3426
  return TSDB_CODE_SUCCESS;
3427 3428
}

3429
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3430
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3431 3432
  SRowMerger merge = {0};

3433 3434 3435
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3436
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3437
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3438

H
Haojun Liao 已提交
3439 3440 3441 3442 3443 3444 3445 3446 3447 3448
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3449

3450
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3451 3452 3453 3454 3455 3456
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3457
  } else {
H
Haojun Liao 已提交
3458
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3459

H
Haojun Liao 已提交
3460
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3461
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3462 3463 3464 3465 3466 3467 3468 3469
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3470 3471

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3472 3473 3474 3475 3476
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3477
  }
3478

3479 3480
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3481 3482
}

3483 3484
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3485 3486
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3487
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3488
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3489

3490 3491
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3492
  if (pBlockScanInfo->iter.hasVal) {
3493 3494 3495 3496 3497 3498
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3499
  if (pBlockScanInfo->iiter.hasVal) {
3500 3501 3502 3503 3504 3505
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3506
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3507
    TSDBKEY k = TSDBROW_KEY(pRow);
3508
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3509

3510
    int32_t code = TSDB_CODE_SUCCESS;
3511 3512
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3513
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3514
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3515
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3516
      }
3517
    } else {  // ik.ts == k.ts
3518
      *freeTSRow = true;
3519 3520 3521 3522
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3523
    }
3524

3525
    return code;
H
Haojun Liao 已提交
3526 3527
  }

3528
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3529 3530
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3531 3532
  }

3533
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3534
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3535 3536 3537 3538 3539
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3540 3541
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                             STableBlockScanInfo* pScanInfo) {
3542 3543
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
3544
  int64_t uid = pScanInfo->uid;
3545

3546
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3547
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3548

3549
  SColVal colVal = {0};
3550
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3551

3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3563
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3564 3565 3566 3567 3568 3569 3570 3571
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3572
    }
3573 3574
  }

3575
  // set null value since current column does not exist in the "pSchema"
3576
  while (i < numOfCols) {
3577 3578 3579 3580 3581
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3582
  pBlock->info.rows += 1;
3583
  pScanInfo->lastKey = pTSRow->ts;
3584 3585 3586
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3587 3588
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3589 3590 3591 3592 3593 3594 3595 3596
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3597
    i += 1;
3598 3599 3600
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3601
  int32_t numOfInputCols = pBlockData->nColData;
3602
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3603

3604
  while (i < numOfOutputCols && j < numOfInputCols) {
3605
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3606
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3607

3608 3609 3610 3611 3612
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3613
    if (pData->cid == pCol->info.colId) {
3614 3615
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3616
      j += 1;
H
Haojun Liao 已提交
3617 3618
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3619 3620 3621 3622 3623 3624 3625 3626
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3627
    colDataAppendNULL(pCol, outputRowIndex);
3628 3629 3630 3631 3632 3633 3634
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3635 3636
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3637 3638 3639 3640
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3641
    bool    freeTSRow = false;
3642
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3643 3644
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3645 3646
    }

3647 3648
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3649 3650 3651
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3652 3653

    // no data in buffer, return immediately
3654
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3655 3656 3657
      break;
    }

3658
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3659 3660 3661 3662
      break;
    }
  } while (1);

3663
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3664 3665
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3666

3667 3668
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3669
  ASSERT(pReader != NULL);
3670
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3671

3672
  STableBlockScanInfo** p = NULL;
3673
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3674
    clearBlockScanInfo(*p);
3675 3676
  }

3677 3678 3679
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3680 3681
  taosHashClear(pReader->status.pTableMap);

H
Hongze Cheng 已提交
3682 3683
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3684 3685 3686
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3687 3688
  }

H
Hongze Cheng 已提交
3689 3690 3691
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3692 3693 3694 3695 3696 3697
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3698

dengyihao's avatar
dengyihao 已提交
3699 3700 3701 3702 3703 3704
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3705

H
Hongze Cheng 已提交
3706
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3707

3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3723
// ====================================== EXPOSED APIs ======================================
3724 3725
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3726 3727 3728 3729 3730 3731
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3732 3733
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3734 3735
    goto _err;
  }
H
Hongze Cheng 已提交
3736

3737
  // check for query time window
H
Haojun Liao 已提交
3738
  STsdbReader* pReader = *ppReader;
3739
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3740 3741 3742
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3743

3744 3745
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3746
    int32_t order = pCond->order;
3747
    if (order == TSDB_ORDER_ASC) {
3748
      pCond->twindows.ekey = window.skey;
3749 3750 3751
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3752
      pCond->twindows.skey = window.ekey;
3753 3754 3755 3756
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3757
    // here we only need one more row, so the capacity is set to be ONE.
3758 3759 3760 3761 3762 3763
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3764
      pCond->twindows.skey = window.ekey;
3765
      pCond->twindows.ekey = INT64_MAX;
3766
    } else {
3767
      pCond->twindows.skey = INT64_MIN;
3768
      pCond->twindows.ekey = window.ekey;
3769
    }
3770 3771
    pCond->order = order;

3772 3773 3774 3775 3776 3777
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3778
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3779
  if (pCond->suid != 0) {
3780
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3781
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3782
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3783
    }
3784 3785
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3786
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3787
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3788
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3789
    }
3790 3791
  }

3792 3793 3794
  if (pReader->pSchema != NULL) {
    updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
  }
3795

3796
  STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader;
H
Haojun Liao 已提交
3797
  pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
H
Haojun Liao 已提交
3798
  if (pReader->status.pTableMap == NULL) {
H
Haojun Liao 已提交
3799
    tsdbReaderClose(p);
H
Haojun Liao 已提交
3800
    *ppReader = NULL;
H
Haojun Liao 已提交
3801

H
Haojun Liao 已提交
3802 3803 3804
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3805

3806
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3807
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3808 3809 3810
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3811

3812
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3813 3814 3815
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3816
      }
3817
    } else {
H
Haojun Liao 已提交
3818 3819
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3820

H
Haojun Liao 已提交
3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3833

H
Haojun Liao 已提交
3834
      code = doOpenReaderImpl(pPrevReader);
3835
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3836
        return code;
3837
      }
3838 3839 3840
    }
  }

3841
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3842
  return code;
H
Hongze Cheng 已提交
3843

H
Hongze Cheng 已提交
3844
_err:
H
Haojun Liao 已提交
3845
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3846
  return code;
H
refact  
Hongze Cheng 已提交
3847 3848 3849
}

void tsdbReaderClose(STsdbReader* pReader) {
3850 3851
  if (pReader == NULL) {
    return;
3852
  }
H
refact  
Hongze Cheng 已提交
3853

3854
  {
H
Haojun Liao 已提交
3855
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
3856
      STsdbReader* p = pReader->innerReader[0];
3857

3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3869 3870 3871 3872 3873 3874

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3875
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3876

3877 3878 3879 3880
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3881
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3882 3883 3884 3885
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3886

3887
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3888
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3889 3890

  cleanupDataBlockIterator(&pReader->status.blockIter);
3891 3892

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
3893 3894 3895 3896 3897
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }

3898
  blockDataDestroy(pReader->pResBlock);
3899

H
Haojun Liao 已提交
3900 3901 3902
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3903

H
Haojun Liao 已提交
3904
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3905

3906
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3907
  SIOCostSummary* pCost = &pReader->cost;
3908

H
Haojun Liao 已提交
3909 3910
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3911 3912
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3913

H
Haojun Liao 已提交
3914 3915 3916 3917 3918
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3919

H
Haojun Liao 已提交
3920 3921 3922 3923 3924 3925 3926 3927 3928 3929
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3930

3931 3932
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3933 3934 3935
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3936
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3937 3938
}

3939
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3940
  // cleanup the data that belongs to the previous data block
3941 3942
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3943

3944
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3945

3946 3947 3948 3949 3950
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3951

3952 3953 3954
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3955
      buildBlockFromBufferSequentially(pReader);
3956
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3957
    }
3958 3959 3960
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3961
  }
3962

3963
  return false;
H
refact  
Hongze Cheng 已提交
3964 3965
}

3966 3967 3968 3969 3970
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3971
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3972
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3973
    pReader->step = EXTERNAL_ROWS_PREV;
3974 3975 3976
    if (ret) {
      return ret;
    }
3977
  }
3978

3979
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3980 3981
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
3982
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
3983 3984 3985 3986 3987

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3988
    pReader->step = EXTERNAL_ROWS_MAIN;
3989 3990 3991 3992 3993 3994 3995
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3996
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3997 3998
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3999
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4000 4001 4002 4003
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4004
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
4005
    pReader->step = EXTERNAL_ROWS_NEXT;
4006 4007 4008 4009 4010 4011 4012 4013
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

4014
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Hongze Cheng 已提交
4015 4016
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
4017
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
4018 4019 4020
    return false;
  }

H
Haojun Liao 已提交
4021
  return true;
4022 4023
}

H
Haojun Liao 已提交
4024 4025 4026
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
H
Haojun Liao 已提交
4027
  *uid = pReader->pResBlock->info.id.uid;
H
Haojun Liao 已提交
4028
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
4029 4030
}

H
Haojun Liao 已提交
4031
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
4032
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4033
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
4034
      setBlockInfo(pReader, rows, uid, pWindow);
4035
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
4036
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
4037
    } else {
H
Haojun Liao 已提交
4038
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
4039 4040
    }
  } else {
H
Haojun Liao 已提交
4041
    setBlockInfo(pReader, rows, uid, pWindow);
4042 4043 4044
  }
}

4045
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
4046
  int32_t code = 0;
4047
  *allHave = false;
H
Hongze Cheng 已提交
4048

4049
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4050 4051 4052 4053
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

4054
  // there is no statistics data for composed block
4055
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4056 4057 4058
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4059

4060
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4061

H
Hongze Cheng 已提交
4062
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4063
  //  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
4064

4065 4066
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
4067
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4068
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4069
    if (code != TSDB_CODE_SUCCESS) {
4070 4071
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4072 4073
      return code;
    }
4074 4075 4076
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
4077
  }
H
Hongze Cheng 已提交
4078

4079
  *allHave = true;
H
Hongze Cheng 已提交
4080

4081 4082
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4083

4084 4085
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4086 4087 4088 4089 4090 4091 4092 4093
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
H
Hongze Cheng 已提交
4094
  size_t  size = taosArrayGetSize(pSup->pColAgg);
4095
#if 0
4096
  while (j < numOfCols && i < size) {
4097 4098 4099 4100 4101 4102
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
4103
        break;
4104
      }
4105 4106
      i += 1;
      j += 1;
4107 4108 4109 4110 4111 4112
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }
4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126
#else

  // fill the all null data column
  SArray* pNewAggList = taosArrayInit(numOfCols, sizeof(SColumnDataAgg));

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      taosArrayPush(pNewAggList, pAgg);
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
4127 4128 4129
      if (pSup->colIds[j] == PRIMARYKEY_TIMESTAMP_COL_ID) {
        taosArrayPush(pNewAggList, &pSup->tsColAgg);
      } else {
4130
      // all date in this block are null
4131 4132 4133
        SColumnDataAgg nullColAgg = {.colId = pSup->colIds[j], .numOfNull = pBlock->nRow};
        taosArrayPush(pNewAggList, &nullColAgg);
      }
4134 4135 4136 4137 4138 4139 4140
      j += 1;
    }
  }

  taosArrayClear(pSup->pColAgg);
  taosArrayAddAll(pSup->pColAgg, pNewAggList);

4141 4142
  size_t num = taosArrayGetSize(pSup->pColAgg);
  for(int32_t k = 0; k < num; ++k) {
4143 4144 4145
    pSup->plist[k] = taosArrayGet(pSup->pColAgg, k);
  }

H
Haojun Liao 已提交
4146 4147
  taosArrayDestroy(pNewAggList);

4148
#endif
4149

4150
  pReader->cost.smaDataLoad += 1;
4151 4152
  *pBlockStatis = pSup->plist;

4153
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4154
  return code;
H
Hongze Cheng 已提交
4155 4156
}

4157
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4158 4159 4160
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
4161
    return pReader->pResBlock->pDataBlock;
4162
  }
4163

H
Haojun Liao 已提交
4164
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4165 4166
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4167 4168 4169 4170 4171 4172
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4173

4174
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4175
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4176
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4177 4178
    terrno = code;
    return NULL;
4179
  }
4180 4181 4182

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
4183 4184
}

4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4197
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4198
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4199 4200
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4201

4202 4203
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4204
  pReader->order = pCond->order;
4205
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4206
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4207
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4208
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4209

4210
  // allocate buffer in order to load data blocks from file
4211
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4212 4213
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

4214
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4215
  tsdbDataFReaderClose(&pReader->pFileReader);
4216

4217
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4218

4219
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4220
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4221

H
Hongze Cheng 已提交
4222
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4223
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4224

4225
  int32_t code = 0;
4226

4227 4228 4229 4230 4231 4232
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4233 4234
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4235 4236 4237
      return code;
    }
  }
H
Hongze Cheng 已提交
4238

H
Hongze Cheng 已提交
4239
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
H
Hongze Cheng 已提交
4240
            " in query %s",
H
Hongze Cheng 已提交
4241 4242
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4243

4244
  return code;
H
Hongze Cheng 已提交
4245
}
H
Hongze Cheng 已提交
4246

4247 4248 4249
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4250

4251 4252 4253 4254
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4255

4256 4257
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4258

4259 4260 4261
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4262

4263
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4264

4265
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4266

4267 4268
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4269

4270 4271
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4272

4273 4274
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4275
  }
H
Hongze Cheng 已提交
4276

4277
  pTableBlockInfo->numOfTables = numOfTables;
4278
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4279

4280 4281
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4282
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4283

4284 4285
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4286

4287 4288 4289
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4290

4291 4292 4293
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4294

4295 4296 4297
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4298

4299 4300
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4301

H
Haojun Liao 已提交
4302
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4303 4304 4305 4306 4307
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4308

4309 4310
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4311
    }
H
refact  
Hongze Cheng 已提交
4312

H
Hongze Cheng 已提交
4313 4314
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4315
  }
H
Hongze Cheng 已提交
4316

H
refact  
Hongze Cheng 已提交
4317 4318
  return code;
}
H
Hongze Cheng 已提交
4319

H
refact  
Hongze Cheng 已提交
4320
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4321
  int64_t rows = 0;
H
Hongze Cheng 已提交
4322

4323 4324
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4325

4326
  while (pStatus->pTableIter != NULL) {
4327
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4328 4329 4330

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4331
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4332 4333 4334 4335 4336 4337 4338
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4339
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4340 4341 4342 4343 4344 4345 4346 4347
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4348

H
refact  
Hongze Cheng 已提交
4349
  return rows;
H
Hongze Cheng 已提交
4350
}
D
dapan1121 已提交
4351

L
Liu Jicong 已提交
4352
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4353 4354 4355 4356
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4357
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4358 4359 4360 4361 4362 4363 4364
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4365

D
dapan1121 已提交
4366
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4367
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4368
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4369
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4382
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4383

D
dapan1121 已提交
4384 4385
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4386

H
Haojun Liao 已提交
4387
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4416
  // fs
H
Hongze Cheng 已提交
4417 4418 4419 4420 4421
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4422 4423 4424 4425 4426 4427 4428 4429

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4430
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4431
_exit:
H
Hongze Cheng 已提交
4432 4433 4434
  return code;
}

H
Haojun Liao 已提交
4435
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4436 4437 4438 4439 4440 4441 4442 4443 4444
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4445
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4446
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4447
  }
H
Haojun Liao 已提交
4448 4449
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}