tsdbRead.c 148.6 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
H
Haojun Liao 已提交
87 88
  int16_t*         colId;
  int16_t*         slotId;
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
91
  bool             smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
92 93
} SBlockLoadSuppInfo;

94
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
95 96 97 98 99
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
100
  SSttBlockLoadInfo* pInfo;
101 102
} SLastBlockReader;

103
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
104 105 106
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
107
  int32_t           order;
H
Hongze Cheng 已提交
108
  SLastBlockReader* pLastBlockReader;  // last file block reader
109
} SFilesetIter;
H
Haojun Liao 已提交
110 111

typedef struct SFileDataBlockInfo {
112
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
113
  uint64_t uid;
114
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
115 116 117
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
118
  int32_t   numOfBlocks;
119
  int32_t   index;
H
Hongze Cheng 已提交
120
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
121
  int32_t   order;
122
  SDataBlk  block;  // current SDataBlk data
123
  SHashObj* pTableMap;
H
Haojun Liao 已提交
124 125 126
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
131 132
} SFileBlockDumpInfo;

133
typedef struct SUidOrderCheckInfo {
134 135
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
136 137
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
138
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
139 140 141
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
142
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
H
Hongze Cheng 已提交
143 144 145 146 147 148
  SUidOrderCheckInfo    uidCheckInfo;       // check all table in uid order
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
149 150
} SReaderStatus;

151
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
152 153 154
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
155 156
} SBlockInfoBuf;

H
Hongze Cheng 已提交
157
struct STsdbReader {
H
Haojun Liao 已提交
158 159 160
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
161
  bool               freeBlock;
H
Haojun Liao 已提交
162 163 164 165
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
166 167
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
168
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
169
  STsdbReadSnap*     pReadSnap;
170
  SIOCostSummary     cost;
171 172
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
173 174 175
  SDataFReader*      pFileReader; // the file reader
  SDelFReader*       pDelFReader; // the del file reader
  SArray*            pDelIdx;     // del file block index;
176
  SVersionRange      verRange;
177 178 179
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
180
};
H
Hongze Cheng 已提交
181

H
Haojun Liao 已提交
182
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
183 184
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
185
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
186 187
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
188
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
189
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
190
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
191
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
192 193
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                                     STableBlockScanInfo* pInfo);
194
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
195
                                         int32_t rowIndex);
196
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
197 198
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
199

H
Hongze Cheng 已提交
200 201 202 203
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
204 205
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
206

dengyihao's avatar
dengyihao 已提交
207 208 209 210
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
211
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
212 213 214
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
215
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
216
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
217
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
H
Haojun Liao 已提交
218

219 220
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

H
Haojun Liao 已提交
221
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
222
  pSupInfo->smaValid = true;
223
  pSupInfo->numOfCols = numOfCols;
H
Haojun Liao 已提交
224 225 226
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES));
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
227 228
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
229

H
Haojun Liao 已提交
230 231
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
  pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
232
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
233 234
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
235

H
Haojun Liao 已提交
236 237
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
238 239
    } else {
      pSupInfo->buildBuf[i] = NULL;
240
    }
H
Haojun Liao 已提交
241
  }
H
Hongze Cheng 已提交
242

H
Haojun Liao 已提交
243 244
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
245

246 247 248 249 250
static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
  int32_t i = 0, j = 0;

  while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
251
    if (pTCol->colId == pSupInfo->colId[j]) {
252 253 254 255 256 257 258
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
        return;
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
259
    } else if (pTCol->colId < pSupInfo->colId[j]) {
260 261 262 263 264 265 266 267
      // do nothing
      i += 1;
    } else {
      ASSERT(0);
    }
  }
}

268
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
269
  int32_t num = numOfTables / pBuf->numPerBucket;
270 271 272 273 274
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
275
  for (int32_t i = 0; i < num; ++i) {
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
297
  for (int32_t i = 0; i < num; ++i) {
298 299 300 301 302 303 304 305 306
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
307
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
308 309 310 311
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Haojun Liao 已提交
312
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
313
  // allocate buffer in order to load data blocks from file
314
  // todo use simple hash instead, optimize the memory consumption
315 316 317
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
318 319 320
    return NULL;
  }

H
Haojun Liao 已提交
321
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
322
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
323

324
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
325
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
326 327 328 329 330 331 332 333 334 335
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
336 337
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
338 339
  }

H
Haojun Liao 已提交
340 341 342 343
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
344

345
  return pTableMap;
H
Hongze Cheng 已提交
346
}
H
Hongze Cheng 已提交
347

348 349
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
350
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
351
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
352 353

    pInfo->iterInit = false;
H
Haojun Liao 已提交
354
    pInfo->iter.hasVal = false;
355
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
356

357 358
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
359 360
    }

H
Haojun Liao 已提交
361 362 363 364
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
    }

365 366
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
367 368 369
  }
}

370 371
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
372 373

  p->iter.hasVal = false;
374
  p->iiter.hasVal = false;
375

376 377 378
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
379

380 381 382
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
383

384 385 386 387
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
388

H
Haojun Liao 已提交
389
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
390
  void* p = NULL;
H
Haojun Liao 已提交
391
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
392
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
393 394 395 396 397
  }

  taosHashCleanup(pTableMap);
}

398
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
399 400
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
401
}
H
Hongze Cheng 已提交
402

403 404 405
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
406
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
407

408
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
409
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
410

dengyihao's avatar
dengyihao 已提交
411
  STimeWindow win = *pWindow;
412 413 414 415 416 417
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
418

H
Haojun Liao 已提交
419
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
420 421 422 423 424 425
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
426 427 428
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
429 430 431 432
  }
}

// init file iterator
433
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
434
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
435

436 437
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
438
  pIter->pFileList = aDFileSet;
439
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
440

441 442 443 444
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
445
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
446 447
      return code;
    }
448 449
  }

450 451 452 453 454 455 456 457
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

458
  if (pLReader->pInfo == NULL) {
459
    // here we ignore the first column, which is always be the primary timestamp column
460
    pLReader->pInfo =
H
Haojun Liao 已提交
461
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
462 463 464 465
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
466 467
  }

468
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
469 470 471
  return TSDB_CODE_SUCCESS;
}

472
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
473 474
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
475 476 477
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
478 479 480
    return false;
  }

H
Haojun Liao 已提交
481 482 483
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

484 485
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
486
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
487

H
Haojun Liao 已提交
488 489
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
490

491
  while (1) {
H
Haojun Liao 已提交
492 493 494
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
495

496
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
497

498 499 500 501
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
502

503 504
    pReader->cost.headFileLoad += 1;

505 506 507 508 509 510 511 512 513 514 515 516
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
517 518 519
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
520 521
      continue;
    }
C
Cary Xu 已提交
522

523
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
524
              pReader->window.ekey, pReader->idStr);
525 526
    return true;
  }
527

H
Haojun Liao 已提交
528
_err:
H
Haojun Liao 已提交
529 530 531
  return false;
}

532
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
533 534
  pIter->order = order;
  pIter->index = -1;
535
  pIter->numOfBlocks = 0;
536 537 538 539 540 541 542
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
543
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
544

H
Haojun Liao 已提交
545
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
546 547
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
548 549
}

550 551 552 553 554 555 556 557
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
558
    SColumnInfoData colInfo = {0};
559 560 561 562 563 564 565 566 567 568 569 570 571
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

572
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
573
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
574
  int32_t      code = 0;
575
  int8_t       level = 0;
H
Haojun Liao 已提交
576
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
577 578
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
579
    goto _end;
H
Hongze Cheng 已提交
580 581
  }

C
Cary Xu 已提交
582 583 584 585
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
586
  initReaderStatus(&pReader->status);
587

L
Liu Jicong 已提交
588
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
589 590
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
591
  pReader->capacity = capacity;
H
Haojun Liao 已提交
592
  pReader->pResBlock = pResBlock;
dengyihao's avatar
dengyihao 已提交
593 594
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
595
  pReader->type = pCond->type;
596
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
597
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
598
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
599

H
Haojun Liao 已提交
600 601 602 603 604 605 606 607 608
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }

H
Haojun Liao 已提交
609
  // todo refactor.
610
  limitOutputBufferSize(pCond, &pReader->capacity);
611

612 613
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
614
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
615
  if (pSup->pColAgg == NULL) {
616 617 618
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
619

620 621
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
622
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
623 624 625 626 627
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

H
Haojun Liao 已提交
628
  setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
629

H
Hongze Cheng 已提交
630 631
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
632

H
Haojun Liao 已提交
633
_end:
H
Haojun Liao 已提交
634
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
635 636 637
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
638

H
Haojun Liao 已提交
639
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
640
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
641

642
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
643
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
644
  if (code != TSDB_CODE_SUCCESS) {
645
    goto _end;
H
Haojun Liao 已提交
646
  }
H
Hongze Cheng 已提交
647

648 649
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
650
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
651 652
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
653

654 655 656 657
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
658
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
659

660
    // uid check
H
Hongze Cheng 已提交
661
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
662 663 664 665
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
666
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
667 668 669 670
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
671
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
672
    if (pScanInfo->pBlockList == NULL) {
673
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
674 675
    }

H
Hongze Cheng 已提交
676
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
677
  }
H
Hongze Cheng 已提交
678

679
  int64_t et2 = taosGetTimestampUs();
680
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
681
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
682 683 684

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
685
_end:
H
Hongze Cheng 已提交
686
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
687 688
  return code;
}
H
Hongze Cheng 已提交
689

690
static void cleanupTableScanInfo(SHashObj* pTableMap) {
691
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
692
  while (1) {
693
    px = taosHashIterate(pTableMap, px);
694 695 696 697
    if (px == NULL) {
      break;
    }

698
    // reset the index in last block when handing a new file
699 700
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
701
  }
702 703
}

704
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
705 706 707 708 709 710
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
711

dengyihao's avatar
dengyihao 已提交
712
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
713
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
714

H
Haojun Liao 已提交
715 716
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
717

718
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
719
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
720
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
721

722
    sizeInDisk += pScanInfo->mapData.nData;
H
Haojun Liao 已提交
723

H
Haojun Liao 已提交
724
    SDataBlk block = {0};
725
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
726
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
727

728
      // 1. time range check
H
Haojun Liao 已提交
729
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
730 731
        continue;
      }
H
Hongze Cheng 已提交
732

733
      // 2. version range check
H
Haojun Liao 已提交
734
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
735 736
        continue;
      }
737

H
Haojun Liao 已提交
738 739
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
740

H
Haojun Liao 已提交
741 742
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
743
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
744 745
        return TSDB_CODE_OUT_OF_MEMORY;
      }
746

747
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
748
    }
H
Hongze Cheng 已提交
749

H
Haojun Liao 已提交
750
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
751 752 753 754
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
755
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
756
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
757

758
  double el = (taosGetTimestampUs() - st) / 1000.0;
759 760 761 762 763
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
764

H
Haojun Liao 已提交
765

766
  pReader->cost.numOfBlocks += total;
767
  pReader->cost.headFileLoadTime += el;
768

H
Haojun Liao 已提交
769 770
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
771

772
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
773
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
774
  pDumpInfo->allDumped = true;
775
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
776 777
}

778 779
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
780
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
781
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
782 783 784
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
785
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
786 787 788 789
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
790 791 792
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
793
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
794
  }
H
Haojun Liao 已提交
795 796
}

797
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
798 799
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
800 801
    return NULL;
  }
802 803 804

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
805 806
}

H
Hongze Cheng 已提交
807
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
808

H
Haojun Liao 已提交
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
882
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
883 884 885 886
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
887 888
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
889 890
    while (1) {
      // check can return
H
Hongze Cheng 已提交
891 892 893
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
894 895

      // change start or end position
H
Hongze Cheng 已提交
896
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
897 898
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
899
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
900 901 902 903
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
904
  } else {  // DESC
H
Haojun Liao 已提交
905
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
906 907
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
908 909
    while (1) {
      // check can return
H
Hongze Cheng 已提交
910 911 912
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
913 914

      // change start or end position
H
Hongze Cheng 已提交
915
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
916 917
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
918
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
919 920 921 922 923 924 925
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
926
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
H
Haojun Liao 已提交
927 928
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
929
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
930 931 932 933 934 935

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
936 937
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
938 939 940 941 942
  }

  return endPos;
}

H
Haojun Liao 已提交
943
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
                            int32_t dumpedRows, bool asc)  {
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

  int32_t step = asc? 1:-1;

  // make sure it is aligned to 8bit
  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
    switch(pColData->info.type) {
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
      case TSDB_DATA_TYPE_UBIGINT:
      {
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
        int32_t  mid = dumpedRows >> 1u;
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1006
          int8_t t = pts[j];
H
Haojun Liao 已提交
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
1031
          int32_t t = pts[j];
H
Haojun Liao 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

1054
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
H
Haojun Liao 已提交
1055 1056 1057 1058
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1059

1060
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
1061
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1062
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1063
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1064
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1065

H
Haojun Liao 已提交
1066
  SColVal cv = {0};
1067
  int64_t st = taosGetTimestampUs();
1068 1069
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1070

1071 1072
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1073 1074 1075
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
1076 1077
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1078 1079 1080
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
1081
    }
H
Haojun Liao 已提交
1082 1083 1084
  }

  // time window check
1085 1086 1087 1088 1089 1090 1091
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1092 1093 1094
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1095 1096
  }

H
Haojun Liao 已提交
1097
  int32_t i = 0;
H
Haojun Liao 已提交
1098 1099
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1100 1101
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1102
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1103 1104 1105
    i += 1;
  }

1106
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1107
  int32_t num = pBlockData->nColData;
1108
  while (i < numOfOutputCols && colIndex < num) {
1109 1110
    rowIndex = 0;

H
Hongze Cheng 已提交
1111
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1112
    if (pData->cid < pSupInfo->colId[i]) {
1113
      colIndex += 1;
H
Haojun Liao 已提交
1114 1115
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1116

H
Hongze Cheng 已提交
1117
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1118
        colDataAppendNNULL(pColData, 0, dumpedRows);
H
Haojun Liao 已提交
1119
      } else {
H
Haojun Liao 已提交
1120
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1121 1122
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1123
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
H
Haojun Liao 已提交
1124 1125 1126 1127
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1128
      }
H
Haojun Liao 已提交
1129

1130
      colIndex += 1;
1131
      i += 1;
1132
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1133
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1134
      colDataAppendNNULL(pColData, 0, dumpedRows);
1135
      i += 1;
H
Haojun Liao 已提交
1136
    }
1137 1138
  }

1139
  // fill the mis-matched columns with null value
1140
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1141
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1142
    colDataAppendNNULL(pColData, 0, dumpedRows);
1143
    i += 1;
H
Haojun Liao 已提交
1144
  }
H
Haojun Liao 已提交
1145

H
Haojun Liao 已提交
1146 1147
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1148

1149
  // check if current block are all handled
1150
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1151 1152 1153 1154
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1155
  } else {
1156 1157
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1158
  }
H
Haojun Liao 已提交
1159

1160
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1161
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1162

1163
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1164
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1165
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1166
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Hongze Cheng 已提交
1167
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1168 1169 1170 1171

  return TSDB_CODE_SUCCESS;
}

1172 1173
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1174 1175
  int64_t st = taosGetTimestampUs();

1176 1177
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1178
  int32_t code =
H
Haojun Liao 已提交
1179
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
1180 1181 1182 1183
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1184
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1185
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1186
  ASSERT(pBlockInfo != NULL);
1187

H
Hongze Cheng 已提交
1188
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1189
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1190 1191
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1192
              ", rows:%d, code:%s %s",
1193
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1194 1195 1196
              tstrerror(code), pReader->idStr);
    return code;
  }
1197

1198
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1199

1200
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1201
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1202 1203
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1204 1205 1206

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1207

H
Haojun Liao 已提交
1208
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1209
}
H
Hongze Cheng 已提交
1210

H
Haojun Liao 已提交
1211 1212 1213
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1214

H
Haojun Liao 已提交
1215 1216 1217 1218
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1219

H
Haojun Liao 已提交
1220 1221
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1222

H
Haojun Liao 已提交
1223 1224
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1225

H
Haojun Liao 已提交
1226
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1227 1228
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1229

H
Haojun Liao 已提交
1230 1231 1232 1233
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1234

H
Haojun Liao 已提交
1235 1236
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1237

H
Haojun Liao 已提交
1238
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1239
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1240
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1241

H
Haojun Liao 已提交
1242
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1243

H
Haojun Liao 已提交
1244 1245
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1246

H
Haojun Liao 已提交
1247 1248 1249 1250 1251 1252 1253
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1254

1255
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1256
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1257

1258 1259 1260
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1261
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1262 1263
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1264
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1265
    if (pScanInfo == NULL) {
1266
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1267 1268 1269
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1270 1271
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1272
  }
1273 1274 1275 1276 1277 1278

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1279
}
H
Hongze Cheng 已提交
1280

1281
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1282
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1283

1284
  SBlockOrderSupporter sup = {0};
1285
  pBlockIter->numOfBlocks = numOfBlocks;
1286
  taosArrayClear(pBlockIter->blockList);
1287
  pBlockIter->pTableMap = pReader->status.pTableMap;
1288

1289 1290
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1291

1292
  int64_t st = taosGetTimestampUs();
1293
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1294 1295 1296
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1297

1298 1299 1300 1301 1302 1303 1304
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1305

1306
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1307 1308 1309
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1310

1311 1312
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1313

1314 1315 1316
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1317
      return TSDB_CODE_OUT_OF_MEMORY;
1318
    }
H
Haojun Liao 已提交
1319

1320
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1321

1322 1323 1324
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1325
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1326 1327 1328 1329 1330
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1331

1332
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1333

1334
  // since there is only one table qualified, blocks are not sorted
1335 1336
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1337 1338
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1339
    }
1340

1341
    int64_t et = taosGetTimestampUs();
1342
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1343
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1344

1345
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1346
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1347
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1348
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1349
  }
H
Haojun Liao 已提交
1350

1351 1352
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1353

1354
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1355

1356 1357 1358 1359
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1360
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1361
  }
H
Haojun Liao 已提交
1362

1363 1364 1365 1366
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1367

1368 1369
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1370

1371 1372 1373 1374
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1375

1376 1377
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1378
  }
H
Haojun Liao 已提交
1379

1380
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1381 1382
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1383 1384
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1385

1386
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1387
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1388

1389
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1390
}
H
Hongze Cheng 已提交
1391

H
Haojun Liao 已提交
1392
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1393 1394
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1395
  int32_t step = asc ? 1 : -1;
1396
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1397 1398 1399
    return false;
  }

1400
  pBlockIter->index += step;
H
Haojun Liao 已提交
1401
  doSetCurrentBlock(pBlockIter, idStr);
1402

1403 1404 1405
  return true;
}

1406 1407 1408
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1409
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1410 1411
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1412 1413
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1414
}
H
Hongze Cheng 已提交
1415

1416
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1417
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1418
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1419
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1420
    return false;
1421 1422
  }

H
Haojun Liao 已提交
1423
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1424
    return false;
1425 1426
  }

1427
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1428
  *nextIndex = pBlockInfo->tbBlockIdx + step;
H
Hongze Cheng 已提交
1429 1430
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1431
  return true;
1432 1433 1434 1435 1436
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1437
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1438 1439
  int32_t index = pBlockIter->index;

1440
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1453
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1454
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1455 1456 1457 1458
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1459 1460 1461 1462 1463
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1464

1465 1466 1467
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1468

H
Haojun Liao 已提交
1469
  doSetCurrentBlock(pBlockIter, "");
1470 1471 1472
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1473
// todo: this attribute could be acquired during extractin the global ordered block list.
1474
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1475 1476
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1477
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1478
  } else {
1479
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1480
  }
H
Haojun Liao 已提交
1481
}
H
Hongze Cheng 已提交
1482

H
Hongze Cheng 已提交
1483
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1484
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1485

1486
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1487
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1488
}
H
Hongze Cheng 已提交
1489

H
Hongze Cheng 已提交
1490
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1491 1492
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1493 1494
}

H
Hongze Cheng 已提交
1495 1496
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1497 1498
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1499
  for (int32_t i = startIndex; i < num; i += 1) {
1500 1501
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1502
      if (p->version >= pBlock->minVer) {
1503 1504 1505
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1506
      if (p->version >= pBlock->minVer) {
1507 1508
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1509 1510
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1524
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1525 1526 1527 1528
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1529
  // ts is not overlap
1530
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1531
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1532 1533 1534 1535 1536
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1537
  if (ASCENDING_TRAVERSE(order)) {
1538
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1539 1540
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1541
    while (1) {
1542 1543 1544 1545 1546
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1547 1548 1549
      }
    }

1550
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1551
  }
1552 1553
}

H
Haojun Liao 已提交
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1567 1568
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1569

1570
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1571

1572
  // overlap with neighbor
1573
  if (hasNeighbor) {
1574
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1575 1576
  }

1577
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1578 1579
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1580

H
Haojun Liao 已提交
1581 1582 1583
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1584 1585
  }

H
Haojun Liao 已提交
1586 1587 1588 1589
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1590

H
Haojun Liao 已提交
1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1605 1606 1607 1608

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1609 1610
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1611 1612 1613
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1614 1615 1616
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1617 1618
}

H
Haojun Liao 已提交
1619 1620 1621 1622 1623 1624 1625 1626 1627
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1628
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1629
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1630 1631
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1632

1633 1634 1635
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1636
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1637

H
Haojun Liao 已提交
1638
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1639
  pBlock->info.id.uid = pBlockScanInfo->uid;
1640

1641
  setComposedBlockFlag(pReader, true);
1642

1643
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1644
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
H
Hongze Cheng 已提交
1645
            " - %" PRId64 " %s",
1646 1647
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1648 1649

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1650 1651 1652
  return code;
}

1653 1654
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1655 1656 1657
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1658 1659
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1660
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1661 1662

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1663
    if (nextKey != key) {  // merge is not needed
1664
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1665 1666 1667 1668 1669 1670 1671 1672
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1673 1674
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1675 1676 1677 1678 1679 1680 1681 1682
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1683 1684
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1685 1686 1687 1688 1689 1690 1691
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1692
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1707 1708 1709
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1710
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1711 1712
  }

1713
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1714 1715 1716 1717 1718 1719
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1720 1721 1722 1723 1724 1725
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1726 1727 1728 1729 1730 1731
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1732
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1733
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1734
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1735 1736
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1737 1738
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1739
  }
H
Haojun Liao 已提交
1740 1741
}

1742
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1743 1744 1745 1746 1747 1748
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1749
  int64_t tsLast = INT64_MIN;
1750
  if (hasDataInLastBlock(pLastBlockReader)) {
1751 1752
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1753

H
Hongze Cheng 已提交
1754 1755
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1756

1757 1758
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1759
    minKey = INT64_MAX;  // chosen the minimum value
1760
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1761 1762
      minKey = tsLast;
    }
1763

1764 1765 1766
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1767

1768
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1769 1770 1771 1772
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1773
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1774 1775 1776 1777 1778 1779 1780
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1781
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1782 1783
      minKey = key;
    }
1784 1785 1786 1787
  }

  bool init = false;

1788
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1789
  // DESC: mem -----> imem -----> last block -----> file block
1790 1791
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1792
      init = true;
H
Haojun Liao 已提交
1793 1794 1795 1796
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1797
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1798 1799
    }

1800
    if (minKey == tsLast) {
1801
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1802 1803 1804
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1805
        init = true;
H
Haojun Liao 已提交
1806 1807 1808 1809
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1810
      }
1811
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1812
    }
1813

1814
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1815 1816 1817
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1818 1819
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1820 1821 1822 1823 1824 1825 1826 1827
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1828 1829 1830 1831 1832
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1833
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1834 1835 1836 1837 1838 1839
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1840
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1841 1842
        return code;
      }
1843 1844
    }

1845
    if (minKey == tsLast) {
1846
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1847 1848 1849
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1850
        init = true;
H
Haojun Liao 已提交
1851 1852 1853 1854
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1855
      }
1856
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1857 1858 1859
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1860 1861 1862
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1863
        init = true;
H
Haojun Liao 已提交
1864 1865 1866 1867
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1868 1869 1870
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1871 1872
  }

1873 1874 1875 1876 1877
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1878
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1879 1880 1881 1882 1883 1884

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1885 1886 1887
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1888
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1889
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1890 1891 1892

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1893
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1894
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1895

1896 1897 1898
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1899
      pBlockScanInfo->lastKey = tsLastBlock;
1900 1901
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1902 1903 1904 1905
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1906 1907 1908

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1909
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1910

H
Haojun Liao 已提交
1911
      code = tRowMergerGetRow(&merge, &pTSRow);
1912 1913 1914
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1915

1916
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1917 1918 1919 1920 1921

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1922 1923 1924 1925 1926
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1927
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1928
    ASSERT(mergeBlockData);
1929 1930

    // merge with block data if ts == key
H
Haojun Liao 已提交
1931
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1932 1933 1934
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1935
    code = tRowMergerGetRow(&merge, &pTSRow);
1936 1937 1938 1939
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1940
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1941 1942 1943 1944

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1945 1946 1947 1948

  return TSDB_CODE_SUCCESS;
}

1949 1950
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1951 1952
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1953
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1954
    // no last block available, only data block exists
1955
    if (!hasDataInLastBlock(pLastBlockReader)) {
1956 1957 1958 1959 1960 1961 1962 1963 1964
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1965
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1966 1967 1968 1969
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1970

H
Haojun Liao 已提交
1971 1972 1973 1974 1975
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1976
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1977 1978 1979 1980

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1981
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1982

H
Haojun Liao 已提交
1983
        code = tRowMergerGetRow(&merge, &pTSRow);
1984 1985 1986 1987
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1988
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1989

1990 1991
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1992
        return code;
1993
      } else {
1994 1995
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1996
      }
1997
    } else {  // desc order
1998
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1999
    }
2000
  } else {  // only last block exists
2001
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
2002
  }
2003 2004
}

2005 2006
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
2007 2008 2009
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
2010 2011 2012
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

2013 2014
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
2015 2016
  ASSERT(pRow != NULL && piRow != NULL);

2017
  int64_t tsLast = INT64_MIN;
2018 2019 2020
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
2021

H
Hongze Cheng 已提交
2022
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2023 2024 2025 2026

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2027
  int64_t minKey = 0;
2028 2029 2030 2031 2032
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
2033

2034 2035 2036
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
2037

2038
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2039 2040
      minKey = key;
    }
2041

2042
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
2043 2044 2045
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
2046
    minKey = INT64_MIN;  // let find the maximum ts value
2047 2048 2049 2050 2051 2052 2053 2054
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2055
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2056 2057 2058
      minKey = key;
    }

2059
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2060 2061
      minKey = tsLast;
    }
2062 2063 2064 2065
  }

  bool init = false;

2066 2067 2068 2069
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2070
      init = true;
2071
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2072
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2073 2074 2075 2076
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2077
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2078 2079
    }

2080
    if (minKey == tsLast) {
2081
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2082 2083 2084
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2085
        init = true;
2086
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2087 2088 2089
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2090
      }
H
Haojun Liao 已提交
2091

2092
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2093 2094 2095
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2096 2097 2098
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2099 2100
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2101 2102 2103 2104
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2105
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2106 2107 2108
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2109
      }
H
Haojun Liao 已提交
2110

2111 2112
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2113 2114 2115
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2116 2117
    }

2118
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2119
      if (init) {
2120 2121 2122 2123
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2124 2125
        tRowMerge(&merge, pRow);
      } else {
2126
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2127
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2128 2129 2130
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2131
      }
H
Haojun Liao 已提交
2132 2133 2134 2135 2136
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2137 2138 2139 2140
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2141
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2142
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2143 2144 2145 2146
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2147 2148 2149 2150 2151
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2152 2153 2154
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2155 2156 2157
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2158 2159
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2160
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2161 2162 2163
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2164
      }
H
Haojun Liao 已提交
2165 2166 2167 2168 2169
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2170 2171 2172
    }

    if (minKey == tsLast) {
2173
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2174 2175 2176
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2177
        init = true;
2178
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2179 2180 2181
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2182
      }
2183
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2184 2185 2186
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2187
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2188
      if (!init) {
2189
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2190 2191 2192
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2193
      } else {
2194 2195 2196
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2197
        tRowMerge(&merge, &fRow);
2198 2199
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2200 2201 2202
    }
  }

2203
  if (merge.pTSchema == NULL) {
2204 2205 2206
    return code;
  }

H
Haojun Liao 已提交
2207
  code = tRowMergerGetRow(&merge, &pTSRow);
2208 2209 2210 2211
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2212
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2213 2214 2215

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2216
  return code;
2217 2218
}

2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
H
Haojun Liao 已提交
2239
      ASSERT(pBlockScanInfo->iter.iter == NULL);
2240 2241 2242 2243 2244
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2245
                  "-%" PRId64 " %s",
2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2266
                  "-%" PRId64 " %s",
2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2284 2285
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2286 2287 2288 2289 2290 2291 2292 2293
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2305
  TSDBKEY k = {.ts = ts, .version = ver};
2306 2307
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2308 2309 2310
    return false;
  }

2311 2312 2313
  return true;
}

2314
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2315
  // the last block reader has been initialized for this table.
2316
  if (pLBlockReader->uid == pScanInfo->uid) {
2317
    return hasDataInLastBlock(pLBlockReader);
2318 2319
  }

2320 2321
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2322 2323
  }

2324 2325
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2326

H
Hongze Cheng 已提交
2327
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2328 2329 2330
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2331
  } else {
2332
    w.ekey = pScanInfo->lastKey + step;
2333 2334
  }

2335 2336 2337
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2338 2339 2340 2341
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2342
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2343 2344
}

2345
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2346
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2347
  return TSDBROW_TS(&row);
2348 2349
}

H
Hongze Cheng 已提交
2350
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2351 2352 2353 2354

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2355 2356
  }

2357
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2358
}
2359

2360 2361
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2362 2363
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2364
    pBlockScanInfo->lastKey = key;
2365 2366
    return TSDB_CODE_SUCCESS;
  } else {
2367 2368
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2369 2370 2371
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2372 2373 2374 2375 2376
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2377
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2378
    code = tRowMergerGetRow(&merge, &pTSRow);
2379 2380 2381 2382
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2383
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2384 2385 2386 2387 2388 2389 2390

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2391 2392
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2393 2394
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2395
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2396
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2397
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2398
  } else {
2399 2400 2401 2402 2403 2404 2405 2406 2407
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2408
    // imem + file + last block
2409
    if (pBlockScanInfo->iiter.hasVal) {
2410
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2411 2412
    }

2413
    // mem + file + last block
2414
    if (pBlockScanInfo->iter.hasVal) {
2415
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2416
    }
2417

2418 2419
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2420 2421 2422
  }
}

2423
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2424 2425
  int32_t code = TSDB_CODE_SUCCESS;

2426 2427
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2428
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2429 2430
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2431
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2432
  int64_t st = taosGetTimestampUs();
2433
  int32_t step = asc ? 1 : -1;
2434 2435 2436

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2437 2438
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2439
      code = TSDB_CODE_INVALID_PARA;
2440 2441
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2442 2443 2444
      goto _end;
    }

H
Hongze Cheng 已提交
2445
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2446

H
Haojun Liao 已提交
2447
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2448
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2449 2450

    // it is a clean block, load it directly
2451 2452
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && 
        pBlock->nRow <= pReader->capacity) {
2453
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2454
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2455 2456

        // record the last key value
H
Hongze Cheng 已提交
2457
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2458 2459
        goto _end;
      }
H
Haojun Liao 已提交
2460 2461
    }
  } else {  // file blocks not exist
2462
    pBlockScanInfo = *pReader->status.pTableIter;
2463 2464
  }

2465
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2466
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2467

2468
  while (1) {
2469
    bool hasBlockData = false;
2470
    {
H
Haojun Liao 已提交
2471
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2472 2473 2474 2475 2476
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2477 2478
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2479
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2480
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2481 2482 2483 2484 2485

          int32_t nextIndex = -1;
          SBlockIndex bIndex = {0};
          bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex);
          if (!hasNeighbor) {  // do nothing
2486 2487
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2488 2489
          }

2490
          if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) {  // load next block
2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503
            SReaderStatus*  pStatus = &pReader->status;
            SDataBlockIter* pBlockIter = &pStatus->blockIter;

            // 1. find the next neighbor block in the scan block list
            SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
            int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

            // 2. remove it from the scan block list
            setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

            // 3. load the neighbor block, and set it to be the currently accessed file data block
            code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
            if (code != TSDB_CODE_SUCCESS) {
2504 2505
              setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
              break;
2506 2507 2508 2509
            }

            // 4. check the data values
            initBlockDumpInfo(pReader, pBlockIter);
2510 2511 2512
          } else {
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2513
          }
2514 2515
        }
      }
2516
    }
2517

2518
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2519

2520 2521 2522
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2523 2524
    }

2525
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2526

2527
    // currently loaded file data block is consumed
2528
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2529
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2530
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2531 2532 2533 2534 2535
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2536 2537 2538
    }
  }

H
Haojun Liao 已提交
2539
_end:
H
Haojun Liao 已提交
2540
  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
H
Haojun Liao 已提交
2541
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
2542

2543
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2544
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2545 2546 2547

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2548

2549 2550
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Hongze Cheng 已提交
2551
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2552
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2553
              pResBlock->info.rows, el, pReader->idStr);
2554
  }
2555

H
Haojun Liao 已提交
2556
  return code;
2557 2558 2559 2560
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2561 2562
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2563 2564 2565
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2566

2567
  int32_t code = 0;
2568
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
H
Haojun Liao 已提交
2569
  ASSERT(pReader->pReadSnap != NULL);
2570

H
Hongze Cheng 已提交
2571
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2572
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2573
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2574
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2575

H
Haojun Liao 已提交
2576
    if (pIdx != NULL) {
2577
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2578 2579 2580 2581
    }

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2582
    }
2583
  }
2584

2585 2586 2587 2588 2589 2590 2591
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2592 2593
  }

2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2608
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2609
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2610 2611
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2612
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2613 2614
  return code;

H
Haojun Liao 已提交
2615
_err:
2616 2617
  taosArrayDestroy(pDelData);
  return code;
2618 2619
}

H
Haojun Liao 已提交
2620
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2621
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2622
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2623
  if (pRow != NULL) {
2624 2625 2626
    key = TSDBROW_KEY(pRow);
  }

2627
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2628
  if (pRow != NULL) {
2629 2630 2631 2632 2633 2634 2635 2636 2637
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2638
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2639
  SReaderStatus* pStatus = &pReader->status;
2640
  pBlockNum->numOfBlocks = 0;
2641
  pBlockNum->numOfLastFiles = 0;
2642

2643
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2644
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2645 2646

  while (1) {
2647
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2648
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2649 2650 2651
      break;
    }

H
Haojun Liao 已提交
2652
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2653 2654
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2655
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2656 2657 2658
      return code;
    }

H
Hongze Cheng 已提交
2659
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2660
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2661
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2662
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2663 2664 2665
        return code;
      }

2666
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2667 2668 2669
        break;
      }
    }
2670

H
Haojun Liao 已提交
2671 2672 2673
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2674
  taosArrayDestroy(pIndexList);
2675

H
Haojun Liao 已提交
2676
  if (pReader->pReadSnap != NULL) {
2677

H
Haojun Liao 已提交
2678 2679 2680 2681 2682 2683
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2684

H
Haojun Liao 已提交
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2696 2697 2698
    }
  }

H
Haojun Liao 已提交
2699 2700 2701
  return TSDB_CODE_SUCCESS;
}

2702
static int32_t uidComparFunc(const void* p1, const void* p2) {
2703 2704
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2705 2706 2707
  if (pu1 == pu2) {
    return 0;
  } else {
2708
    return (pu1 < pu2) ? -1 : 1;
2709 2710
  }
}
2711

2712
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2713 2714 2715 2716
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2717
  while (p != NULL) {
H
Hongze Cheng 已提交
2718
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
2719 2720 2721 2722 2723 2724 2725
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2726
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2727 2728 2729 2730
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2731

2732
  if (pOrderCheckInfo->tableUidList == NULL) {
2733 2734 2735 2736 2737 2738
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2739
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2740 2741 2742
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2743 2744
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2745 2746
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2747 2748

      // the tableMap has already updated
2749
      if (pStatus->pTableIter == NULL) {
2750
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2751 2752 2753 2754 2755 2756 2757 2758 2759
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2760
      }
2761
    }
2762
  }
2763

2764 2765 2766
  return TSDB_CODE_SUCCESS;
}

2767
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2780
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2781
  SReaderStatus*    pStatus = &pReader->status;
2782 2783
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2784 2785
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2786
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2787 2788
    return code;
  }
2789

2790
  while (1) {
2791
    // load the last data block of current table
H
Hongze Cheng 已提交
2792
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
H
Hongze Cheng 已提交
2793
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2794
    if (!hasVal) {
2795 2796
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2797 2798 2799
        return TSDB_CODE_SUCCESS;
      }
      continue;
2800 2801
    }

2802 2803 2804 2805 2806 2807 2808 2809
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2810

2811
    // current table is exhausted, let's try next table
2812 2813
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2814 2815
      return TSDB_CODE_SUCCESS;
    }
2816 2817 2818
  }
}

2819
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2820 2821
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2822 2823 2824

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2825 2826 2827
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2828

2829
  if (pBlockInfo != NULL) {
H
Hongze Cheng 已提交
2830 2831
    pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2832
  } else {
2833
    pScanInfo = *pReader->status.pTableIter;
2834 2835
  }

H
Haojun Liao 已提交
2836
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2837
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2838 2839 2840 2841
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2842
  if (pBlockInfo != NULL) {
2843
    pBlock = getCurrentBlock(pBlockIter);
2844 2845
  }

2846
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2847
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2848

2849 2850 2851
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2852
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2853
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2854 2855
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2856 2857 2858
    }

    // build composed data block
2859
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2860
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2861
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2862
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2863
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2864
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2865 2866 2867 2868
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2869
      ASSERT(tsLast >= pBlock->maxKey.ts);
2870 2871
      tBlockDataReset(&pReader->status.fileBlockData);

2872
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2873
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2874
    } else {  // whole block is required, return it directly
2875 2876
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
2877
      pInfo->id.uid = pScanInfo->uid;
2878 2879 2880
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2881

2882
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
2883
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
2884
    }
2885 2886 2887 2888 2889
  }

  return code;
}

H
Haojun Liao 已提交
2890
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2891 2892
  SReaderStatus* pStatus = &pReader->status;

2893
  while (1) {
2894 2895 2896
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2897
        return TSDB_CODE_SUCCESS;
2898 2899 2900
      }
    }

2901 2902
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2903

2904
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2905
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2906 2907 2908 2909
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2910
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2911
      return TSDB_CODE_SUCCESS;
2912 2913 2914 2915 2916
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2917
      return TSDB_CODE_SUCCESS;
2918 2919 2920 2921
    }
  }
}

2922
// set the correct start position in case of the first/last file block, according to the query time window
2923
void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2924
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2925

2926 2927 2928
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2929 2930 2931

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2932
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2933 2934
}

2935
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2936 2937
  SBlockNumber num = {0};

2938
  int32_t code = moveToNextFile(pReader, &num);
2939 2940 2941 2942 2943
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2944
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2945 2946 2947 2948 2949
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2950 2951
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2952
  } else {  // no block data, only last block exists
2953
    tBlockDataReset(&pReader->status.fileBlockData);
2954
    resetDataBlockIterator(pBlockIter, pReader->order);
2955
  }
2956 2957

  // set the correct start position according to the query time window
2958
  initBlockDumpInfo(pReader, pBlockIter);
2959 2960 2961
  return code;
}

2962
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2963 2964
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2965 2966
}

2967
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2968
  int32_t code = TSDB_CODE_SUCCESS;
2969 2970
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2971 2972
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2973
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2974
  _begin:
2975 2976 2977 2978 2979
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2980 2981 2982 2983
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2984
    // all data blocks are checked in this last block file, now let's try the next file
2985 2986 2987 2988 2989 2990 2991 2992
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2993
      // this file does not have data files, let's start check the last block file if exists
2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3009
  while (1) {
3010 3011
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3012
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3013
      code = buildComposedDataBlock(pReader);
3014 3015 3016 3017
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3018
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3019 3020
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3021
        } else {
H
Haojun Liao 已提交
3022
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3023 3024 3025 3026 3027 3028
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3029

3030 3031 3032 3033
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3034

3035 3036 3037 3038
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
3039
          }
3040
        }
H
Haojun Liao 已提交
3041
      }
3042 3043

      code = doBuildDataBlock(pReader);
3044 3045
    }

3046 3047 3048 3049 3050 3051 3052 3053
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3054
}
H
refact  
Hongze Cheng 已提交
3055

3056 3057
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3058
  if (VND_IS_RSMA(pVnode)) {
3059
    int8_t  level = 0;
3060 3061
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3062
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
H
Hongze Cheng 已提交
3063 3064
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3065

3066
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3067 3068 3069 3070 3071 3072 3073
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3074
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3075 3076 3077 3078 3079
        break;
      }
      ++level;
    }

3080
    const char* str = (idStr != NULL) ? idStr : "";
3081 3082

    if (level == TSDB_RETENTION_L0) {
3083
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3084
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3085 3086
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3087
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3088
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3089 3090
      return VND_RSMA1(pVnode);
    } else {
3091
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3092
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3093 3094 3095 3096 3097 3098 3099
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3100
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3101
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3102 3103

  int64_t endVer = 0;
L
Liu Jicong 已提交
3104 3105
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3106 3107
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3108
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3109 3110
  }

H
Haojun Liao 已提交
3111
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3112 3113
}

3114
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3115 3116 3117 3118
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
3119 3120 3121
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3122

3123 3124 3125 3126 3127 3128
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3129
        return false;
3130 3131 3132
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
3133 3134
      }
    } else {
3135 3136 3137 3138 3139 3140 3141
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3142 3143
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3159 3160
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3161 3162 3163 3164 3165 3166
            return true;
          }
        }
      }

      return false;
3167 3168
    }
  } else {
3169 3170
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3171

3172 3173 3174 3175 3176 3177 3178
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3179
    } else {
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3207 3208 3209 3210 3211
      }

      return false;
    }
  }
3212 3213

  return false;
3214 3215
}

3216
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3217
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3218 3219
    return NULL;
  }
H
Hongze Cheng 已提交
3220

3221
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3222
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3223
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3224
    pIter->hasVal = false;
H
Haojun Liao 已提交
3225 3226
    return NULL;
  }
H
Hongze Cheng 已提交
3227

3228
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3229
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3230
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3231 3232
    return pRow;
  }
H
Hongze Cheng 已提交
3233

3234
  while (1) {
3235 3236
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3237 3238
      return NULL;
    }
H
Hongze Cheng 已提交
3239

3240
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3241

H
Haojun Liao 已提交
3242
    key = TSDBROW_KEY(pRow);
3243
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3244
      pIter->hasVal = false;
H
Haojun Liao 已提交
3245 3246
      return NULL;
    }
H
Hongze Cheng 已提交
3247

dengyihao's avatar
dengyihao 已提交
3248
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3249
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3250 3251 3252 3253
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3254

3255 3256
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3257
  while (1) {
3258 3259
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3260 3261
      break;
    }
H
Hongze Cheng 已提交
3262

3263
    // data exists but not valid
3264
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3265 3266 3267 3268 3269
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3270
    TSDBKEY k = TSDBROW_KEY(pRow);
3271
    if (k.ts != ts) {
H
Haojun Liao 已提交
3272 3273 3274
      break;
    }

H
Haojun Liao 已提交
3275
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3276 3277 3278 3279
    if (pTSchema == NULL) {
      return terrno;
    }

3280
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3281 3282 3283 3284 3285
  }

  return TSDB_CODE_SUCCESS;
}

3286
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3287
                                          SVersionRange* pVerRange, int32_t step) {
3288
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3289
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3290
      rowIndex += step;
3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3307
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3308 3309
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3310
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3311
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3312

3313
  *state = CHECK_FILEBLOCK_QUIT;
3314
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3315

3316 3317 3318 3319
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3320
  if (!hasNeighbor) {  // do nothing
3321 3322 3323
    return 0;
  }

3324
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3325
  if (overlap) {  // load next block
3326
    SReaderStatus*  pStatus = &pReader->status;
3327 3328
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3329
    // 1. find the next neighbor block in the scan block list
3330
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3331
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3332

3333
    // 2. remove it from the scan block list
3334
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3335

3336
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3337
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3338 3339 3340 3341
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3342
    // 4. check the data values
3343 3344 3345 3346
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3347
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3348 3349 3350 3351 3352 3353 3354
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3355 3356
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3357 3358
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3359
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3360
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3361
  int32_t step = asc ? 1 : -1;
3362

3363
  pDumpInfo->rowIndex += step;
3364
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3365 3366 3367
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3368

3369 3370 3371 3372
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3373

3374
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3375
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3376 3377 3378
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3379
      }
3380
    }
H
Haojun Liao 已提交
3381
  }
3382

H
Haojun Liao 已提交
3383 3384 3385
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3386
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3387 3388
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3389 3390
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3391
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3392 3393 3394 3395 3396 3397 3398 3399 3400
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3401 3402
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3403
  TSDBROW* pNextRow = NULL;
3404
  TSDBROW  current = *pRow;
3405

3406 3407
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3408

3409 3410 3411
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3412
      return TSDB_CODE_SUCCESS;
3413
    } else {  // has next point in mem/imem
3414
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3415 3416 3417
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3418
        return TSDB_CODE_SUCCESS;
3419 3420
      }

H
Haojun Liao 已提交
3421
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3422 3423
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3424
        return TSDB_CODE_SUCCESS;
3425
      }
3426
    }
3427 3428
  }

3429 3430
  SRowMerger merge = {0};

3431
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3432
  terrno = 0;
H
Haojun Liao 已提交
3433
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3434 3435 3436
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3437

3438 3439
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3440
  }
H
Haojun Liao 已提交
3441

H
Haojun Liao 已提交
3442 3443 3444 3445
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3446 3447

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3448
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3449 3450 3451
    return terrno;
  }

H
Haojun Liao 已提交
3452 3453
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3454 3455 3456 3457 3458
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3459
  code = tRowMergerGetRow(&merge, pTSRow);
3460 3461 3462
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3463

3464
  tRowMergerClear(&merge);
3465
  *freeTSRow = true;
3466
  return TSDB_CODE_SUCCESS;
3467 3468
}

3469
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3470
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3471 3472
  SRowMerger merge = {0};

3473 3474 3475
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3476
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3477
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3478

H
Haojun Liao 已提交
3479 3480 3481 3482 3483 3484 3485 3486 3487 3488
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3489

3490
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3491 3492 3493 3494 3495 3496
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3497
  } else {
H
Haojun Liao 已提交
3498
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3499

H
Haojun Liao 已提交
3500
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3501
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3502 3503 3504 3505 3506 3507 3508 3509
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3510 3511

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3512 3513 3514 3515 3516
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3517
  }
3518

3519 3520
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3521 3522
}

3523 3524
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3525 3526
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3527
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3528
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3529

3530 3531
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3532
  if (pBlockScanInfo->iter.hasVal) {
3533 3534 3535 3536 3537 3538
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3539
  if (pBlockScanInfo->iiter.hasVal) {
3540 3541 3542 3543 3544 3545
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3546
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3547
    TSDBKEY k = TSDBROW_KEY(pRow);
3548
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3549

3550
    int32_t code = TSDB_CODE_SUCCESS;
3551 3552
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3553
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3554
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3555
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3556
      }
3557
    } else {  // ik.ts == k.ts
3558
      *freeTSRow = true;
3559 3560 3561 3562
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3563
    }
3564

3565
    return code;
H
Haojun Liao 已提交
3566 3567
  }

3568
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3569 3570
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3571 3572
  }

3573
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3574
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3575 3576 3577 3578 3579
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3580 3581
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                             STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3582
  int32_t outputRowIndex = pBlock->info.rows;
3583
  int64_t uid = pScanInfo->uid;
3584

3585
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3586
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3587

3588
  SColVal colVal = {0};
3589
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3590

H
Haojun Liao 已提交
3591 3592
  if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3593
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3594 3595 3596
    i += 1;
  }

H
Haojun Liao 已提交
3597
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3598
    col_id_t colId = pSupInfo->colId[i];
3599 3600

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3601
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3602

H
Haojun Liao 已提交
3603
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3604
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3605 3606 3607
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3608
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3609 3610

      colDataAppendNULL(pColInfoData, outputRowIndex);
3611 3612 3613
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3614
    }
3615 3616
  }

3617
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3618
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3619
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3620
    colDataAppendNULL(pColInfoData, outputRowIndex);
3621 3622 3623
    i += 1;
  }

3624
  pBlock->info.rows += 1;
3625
  pScanInfo->lastKey = pTSRow->ts;
3626 3627 3628
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3629 3630
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3631 3632 3633 3634
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Haojun Liao 已提交
3635 3636
  if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3637
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3638
    i += 1;
3639 3640 3641
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3642
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3643
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3644

3645
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3646
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3647
    if (pData->cid < pSupInfo->colId[i]) {
3648 3649 3650 3651
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3652 3653
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3654 3655
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3656
      j += 1;
H
Haojun Liao 已提交
3657 3658
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3659 3660 3661 3662 3663 3664 3665
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3666
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3667
    colDataAppendNULL(pCol, outputRowIndex);
3668 3669 3670 3671 3672 3673 3674
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3675 3676
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3677 3678 3679 3680
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3681
    bool    freeTSRow = false;
3682
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3683 3684
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3685 3686
    }

3687 3688
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3689 3690 3691
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3692 3693

    // no data in buffer, return immediately
3694
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3695 3696 3697
      break;
    }

3698
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3699 3700 3701 3702
      break;
    }
  } while (1);

3703
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3704 3705
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3706

3707 3708
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3709
  ASSERT(pReader != NULL);
3710
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3711

3712
  STableBlockScanInfo** p = NULL;
3713
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3714
    clearBlockScanInfo(*p);
3715 3716
  }

3717 3718 3719
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3720 3721
  taosHashClear(pReader->status.pTableMap);

H
Hongze Cheng 已提交
3722 3723
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3724 3725 3726
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3727 3728
  }

H
Hongze Cheng 已提交
3729 3730 3731
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3732 3733 3734 3735 3736 3737
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3738

dengyihao's avatar
dengyihao 已提交
3739 3740 3741 3742 3743 3744
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3745

H
Hongze Cheng 已提交
3746
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3747

3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3763
// ====================================== EXPOSED APIs ======================================
3764
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3765
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3766 3767 3768 3769 3770 3771
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

H
Haojun Liao 已提交
3772 3773 3774 3775 3776 3777 3778 3779
  int32_t capacity = 0;
  if (pResBlock == NULL) {
    capacity = 4096;
  } else {
    capacity = pResBlock->info.capacity;
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3780
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3781 3782
    goto _err;
  }
H
Hongze Cheng 已提交
3783

3784
  // check for query time window
H
Haojun Liao 已提交
3785
  STsdbReader* pReader = *ppReader;
3786
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3787 3788 3789
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3790

3791 3792
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3793
    int32_t order = pCond->order;
3794
    if (order == TSDB_ORDER_ASC) {
3795
      pCond->twindows.ekey = window.skey;
3796 3797 3798
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3799
      pCond->twindows.skey = window.ekey;
3800 3801 3802 3803
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3804
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
3805
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
3806 3807 3808 3809 3810
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3811
      pCond->twindows.skey = window.ekey;
3812
      pCond->twindows.ekey = INT64_MAX;
3813
    } else {
3814
      pCond->twindows.skey = INT64_MIN;
3815
      pCond->twindows.ekey = window.ekey;
3816
    }
3817 3818
    pCond->order = order;

H
Haojun Liao 已提交
3819
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
3820 3821 3822 3823 3824
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3825
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3826
  if (pCond->suid != 0) {
3827
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3828
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3829
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3830
    }
3831 3832
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3833
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3834
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3835
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3836
    }
3837 3838
  }

3839 3840 3841
  if (pReader->pSchema != NULL) {
    updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
  }
3842

3843
  STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader;
H
Haojun Liao 已提交
3844
  pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
H
Haojun Liao 已提交
3845
  if (pReader->status.pTableMap == NULL) {
H
Haojun Liao 已提交
3846
    tsdbReaderClose(p);
H
Haojun Liao 已提交
3847
    *ppReader = NULL;
H
Haojun Liao 已提交
3848

S
Shengliang Guan 已提交
3849
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3850 3851
    goto _err;
  }
H
Hongze Cheng 已提交
3852

3853
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3854
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3855 3856 3857
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3858

3859
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3860 3861 3862
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3863
      }
3864
    } else {
H
Haojun Liao 已提交
3865 3866
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3867

H
Haojun Liao 已提交
3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3880

H
Haojun Liao 已提交
3881
      code = doOpenReaderImpl(pPrevReader);
3882
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3883
        return code;
3884
      }
3885 3886 3887
    }
  }

3888
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3889
  return code;
H
Hongze Cheng 已提交
3890

H
Haojun Liao 已提交
3891
  _err:
H
Haojun Liao 已提交
3892
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3893
  return code;
H
Haojun Liao 已提交
3894
  }
H
refact  
Hongze Cheng 已提交
3895 3896

void tsdbReaderClose(STsdbReader* pReader) {
3897 3898
  if (pReader == NULL) {
    return;
3899
  }
H
refact  
Hongze Cheng 已提交
3900

3901
  {
H
Haojun Liao 已提交
3902
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
3903
      STsdbReader* p = pReader->innerReader[0];
3904

3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3916 3917 3918 3919 3920 3921

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3922
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3923

3924
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
3925
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
3926 3927 3928 3929
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3930

H
Haojun Liao 已提交
3931 3932 3933
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
3934

H
Haojun Liao 已提交
3935
  taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
3936
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3937
  cleanupDataBlockIterator(&pReader->status.blockIter);
3938 3939

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
3940 3941 3942 3943 3944
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }

H
Haojun Liao 已提交
3945 3946 3947
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3948

3949 3950 3951 3952 3953 3954 3955 3956 3957
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

H
Haojun Liao 已提交
3958
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3959

3960
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3961
  SIOCostSummary* pCost = &pReader->cost;
3962

H
Haojun Liao 已提交
3963 3964
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3965 3966
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3967

H
Haojun Liao 已提交
3968 3969 3970 3971 3972
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3973

H
Haojun Liao 已提交
3974 3975 3976 3977 3978 3979 3980 3981 3982 3983
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3984

3985 3986
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3987 3988 3989
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3990
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3991 3992
}

3993
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3994
  // cleanup the data that belongs to the previous data block
3995 3996
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3997

3998
  SReaderStatus* pStatus = &pReader->status;
3999 4000 4001
  if (taosHashGetSize(pStatus->pTableMap) == 0){
    return false;
  }
H
Haojun Liao 已提交
4002

4003 4004 4005 4006 4007
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4008

4009 4010 4011
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
4012
      buildBlockFromBufferSequentially(pReader);
4013
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4014
    }
4015 4016 4017
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4018
  }
H
refact  
Hongze Cheng 已提交
4019 4020
}

4021 4022 4023 4024 4025
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

4026
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4027
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4028
    pReader->step = EXTERNAL_ROWS_PREV;
4029 4030 4031
    if (ret) {
      return ret;
    }
4032
  }
4033

4034
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4035 4036
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4037
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4038 4039 4040 4041 4042

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4043
    pReader->step = EXTERNAL_ROWS_MAIN;
4044 4045 4046 4047 4048 4049 4050
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

4051
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
4052 4053
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4054
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4055 4056 4057 4058
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4059
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
4060
    pReader->step = EXTERNAL_ROWS_NEXT;
4061 4062 4063 4064 4065 4066 4067 4068
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

H
Haojun Liao 已提交
4069 4070 4071
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
H
Haojun Liao 已提交
4072
  *uid = pReader->pResBlock->info.id.uid;
H
Haojun Liao 已提交
4073
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
4074 4075
}

H
Haojun Liao 已提交
4076
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
4077
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4078
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
4079
      setBlockInfo(pReader, rows, uid, pWindow);
4080
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
4081
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
4082
    } else {
H
Haojun Liao 已提交
4083
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
4084 4085
    }
  } else {
H
Haojun Liao 已提交
4086
    setBlockInfo(pReader, rows, uid, pWindow);
4087 4088 4089
  }
}

4090

4091 4092
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
    SColumnDataAgg* pTsAgg) {
4093 4094 4095
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
  int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg);
4096
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4108
        taosArrayInsert(pSup->pColAgg, i ,&nullColAgg);
4109 4110 4111 4112 4113 4114
      }
      j += 1;
    }
  }
}

H
Haojun Liao 已提交
4115
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) {
H
Hongze Cheng 已提交
4116
  int32_t code = 0;
4117
  *allHave = false;
H
Hongze Cheng 已提交
4118

4119
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
H
Haojun Liao 已提交
4120
    *pBlockSMA = NULL;
4121 4122 4123
    return TSDB_CODE_SUCCESS;
  }

4124
  // there is no statistics data for composed block
4125
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
H
Haojun Liao 已提交
4126
    *pBlockSMA = NULL;
4127 4128
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4129

4130
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4131 4132
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

4133 4134 4135
  ASSERT(pReader->pResBlock->info.id.uid == pFBlock->uid);

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4136
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4137
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4138
    if (code != TSDB_CODE_SUCCESS) {
4139 4140
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4141 4142
      return code;
    }
4143
  } else {
H
Haojun Liao 已提交
4144
    *pBlockSMA = NULL;
4145
    return TSDB_CODE_SUCCESS;
4146
  }
H
Hongze Cheng 已提交
4147

4148
  *allHave = true;
H
Hongze Cheng 已提交
4149

4150 4151
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4152

4153 4154
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4155 4156 4157 4158
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4159
  size_t numOfCols = pSup->numOfCols;
4160 4161

  int32_t i = 0, j = 0;
H
Hongze Cheng 已提交
4162
  size_t  size = taosArrayGetSize(pSup->pColAgg);
4163

4164 4165 4166 4167 4168
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
    pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
  }
4169

4170
  // do fill all null column value SMA info
4171
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
4172 4173

  i = 0, j = 0;
4174 4175
  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4176 4177
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4178 4179
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4180
    } else if (pAgg->colId < pSup->colId[j]) {
4181
      i += 1;
H
Haojun Liao 已提交
4182
    } else if (pSup->colId[j] < pAgg->colId) {
4183 4184
      ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4185 4186 4187 4188
      j += 1;
    }
  }

H
Haojun Liao 已提交
4189
  *pBlockSMA = pResBlock->pBlockAgg;
4190
  pReader->cost.smaDataLoad += 1;
4191

4192
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4193
  return code;
H
Hongze Cheng 已提交
4194 4195
}

H
Haojun Liao 已提交
4196
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4197 4198 4199
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
H
Haojun Liao 已提交
4200
    return pReader->pResBlock;
4201
  }
4202

H
Haojun Liao 已提交
4203
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4204 4205
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4206 4207 4208 4209 4210 4211
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4212

4213
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4214
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4215
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4216 4217
    terrno = code;
    return NULL;
4218
  }
4219 4220

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
H
Haojun Liao 已提交
4221
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4222 4223
}

H
Haojun Liao 已提交
4224
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4236
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4237
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4238 4239
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4240

4241 4242
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4243
  pReader->order = pCond->order;
4244
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4245
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4246
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4247
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4248

4249
  // allocate buffer in order to load data blocks from file
4250
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4251

4252
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4253
  tsdbDataFReaderClose(&pReader->pFileReader);
4254

4255
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4256

4257
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4258
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4259

H
Hongze Cheng 已提交
4260
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4261
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4262

4263
  int32_t code = 0;
4264

4265 4266 4267 4268 4269 4270
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4271 4272
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4273 4274 4275
      return code;
    }
  }
H
Hongze Cheng 已提交
4276

H
Hongze Cheng 已提交
4277
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
H
Hongze Cheng 已提交
4278
            " in query %s",
H
Hongze Cheng 已提交
4279 4280
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4281

4282
  return code;
H
Hongze Cheng 已提交
4283
}
H
Hongze Cheng 已提交
4284

4285 4286 4287
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4288

4289 4290 4291 4292
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4293

4294 4295
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4296

4297 4298 4299
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4300

4301
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4302

4303
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4304

4305 4306
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4307

4308 4309
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4310

4311 4312
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4313
  }
H
Hongze Cheng 已提交
4314

4315
  pTableBlockInfo->numOfTables = numOfTables;
4316
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4317

4318 4319
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4320
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4321

4322 4323
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4324

4325 4326 4327
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4328

4329 4330 4331
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4332

4333 4334 4335
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4336

4337 4338
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4339

H
Haojun Liao 已提交
4340
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4341 4342 4343 4344 4345
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4346

4347 4348
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4349
    }
H
refact  
Hongze Cheng 已提交
4350

H
Hongze Cheng 已提交
4351 4352
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4353
  }
H
Hongze Cheng 已提交
4354

H
refact  
Hongze Cheng 已提交
4355 4356
  return code;
}
H
Hongze Cheng 已提交
4357

H
refact  
Hongze Cheng 已提交
4358
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4359
  int64_t rows = 0;
H
Hongze Cheng 已提交
4360

4361 4362
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4363

4364
  while (pStatus->pTableIter != NULL) {
4365
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4366 4367 4368

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4369
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4370 4371 4372 4373 4374 4375 4376
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4377
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4378 4379 4380 4381 4382 4383 4384 4385
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4386

H
refact  
Hongze Cheng 已提交
4387
  return rows;
H
Hongze Cheng 已提交
4388
}
D
dapan1121 已提交
4389

L
Liu Jicong 已提交
4390
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4391 4392 4393 4394
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4395
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4396 4397 4398 4399 4400 4401 4402
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4403

D
dapan1121 已提交
4404
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4405
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4406
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4407
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4420
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4421

D
dapan1121 已提交
4422 4423
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4424

H
Haojun Liao 已提交
4425
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4454
  // fs
H
Hongze Cheng 已提交
4455 4456 4457 4458 4459
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4460 4461 4462 4463 4464 4465 4466 4467

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4468
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4469
_exit:
H
Hongze Cheng 已提交
4470 4471 4472
  return code;
}

H
Haojun Liao 已提交
4473
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4474 4475 4476 4477 4478 4479 4480 4481 4482
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4483
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4484
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4485
  }
H
Haojun Liao 已提交
4486 4487
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}