tsdbRead.c 128.6 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
19

20 21 22 23 24 25
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

26
typedef struct {
dengyihao's avatar
dengyihao 已提交
27
  STbDataIter* iter;
28 29 30 31
  int32_t      index;
  bool         hasVal;
} SIterInfo;

32 33 34 35 36
typedef struct {
  int32_t numOfBlocks;
  int32_t numOfLastBlocks;
} SBlockNumber;

H
Haojun Liao 已提交
37
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
38 39
  uint64_t  uid;
  TSKEY     lastKey;
40 41 42 43 44
  SMapData  mapData;      // block info (compressed)
  SArray*   pBlockList;   // block data index list
  SIterInfo iter;         // mem buffer skip list iterator
  SIterInfo iiter;        // imem buffer skip list iterator
  SArray*   delSkyline;   // delete info for this table
dengyihao's avatar
dengyihao 已提交
45
  int32_t   fileDelIndex;
46 47
  bool      iterInit;     // whether to initialize the in-memory skip list iterator or not
  int16_t   indexInBlockL;// row position in last block
H
Haojun Liao 已提交
48 49 50
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
51
  int64_t uid;
52
  int64_t offset;
H
Haojun Liao 已提交
53
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
54 55

typedef struct SBlockOrderSupporter {
56 57 58 59
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
60 61 62
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
63 64 65
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
66
  int64_t headFileLoad;
67 68 69
  double  headFileLoadTime;
  int64_t smaData;
  double  smaLoadTime;
H
Hongze Cheng 已提交
70 71 72
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
73
  SArray*          pColAgg;
74
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
75
  SColumnDataAgg** plist;
76 77
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
78 79
} SBlockLoadSuppInfo;

80 81 82 83 84 85 86 87 88 89 90
typedef struct SVersionRange {
  uint64_t minVer;
  uint64_t maxVer;
} SVersionRange;

typedef struct SLastBlockReader {
  SArray*       pBlockL;
  int32_t       currentBlockIndex;
  SBlockData    lastBlockData;
  STimeWindow   window;
  SVersionRange verRange;
91
  int32_t       order;
92
  uint64_t      uid;
93
  int16_t*      rowIndex;   // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
94 95
} SLastBlockReader;

96
typedef struct SFilesetIter {
97 98 99
  int32_t numOfFiles;    // number of total files
  int32_t index;         // current accessed index in the list
  SArray* pFileList;     // data file list
H
Hongze Cheng 已提交
100
  int32_t order;
101
  SLastBlockReader* pLastBlockReader; // last file block reader
102
} SFilesetIter;
H
Haojun Liao 已提交
103 104

typedef struct SFileDataBlockInfo {
105
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
106
  uint64_t uid;
107
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
108 109 110
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
111
  int32_t   numOfBlocks;
112
  int32_t   index;
113
  SArray*   blockList;      // SArray<SFileDataBlockInfo>
114
  int32_t   order;
115
  SBlock    block;          // current SBlock data
116
  SHashObj* pTableMap;
H
Haojun Liao 已提交
117 118 119
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
120 121 122 123
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
124 125 126
} SFileBlockDumpInfo;

typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
127 128
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
129
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
130
  SFileBlockDumpInfo   fBlockDumpInfo;
131 132 133 134 135
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
136 137
} SReaderStatus;

H
Hongze Cheng 已提交
138
struct STsdbReader {
H
Haojun Liao 已提交
139 140 141 142 143 144 145
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
146 147
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
148
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
149
  STsdbReadSnap*     pReadSnap;
150
  SIOCostSummary     cost;
151 152
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
153 154
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
155

156 157
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
158
};
H
Hongze Cheng 已提交
159

H
Haojun Liao 已提交
160
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
161 162
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
163
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
164 165
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
166
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger);
167
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
168
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
169
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
170
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
171
                                     int32_t rowIndex);
172
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
173
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
174

dengyihao's avatar
dengyihao 已提交
175
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
176
                             STsdbReader* pReader, bool* freeTSRow);
177 178
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
179 180 181 182
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
183
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
184
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
185 186
static bool    hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
187

188 189 190
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

191
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
192

193
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
194
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
195 196 197
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
198 199
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
200

H
Haojun Liao 已提交
201 202
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
203
    pSupInfo->colIds[i] = pCol->info.colId;
204 205 206 207

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
208
  }
H
Hongze Cheng 已提交
209

H
Haojun Liao 已提交
210 211
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
212

213
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
214
  // allocate buffer in order to load data blocks from file
215
  // todo use simple hash instead, optimize the memory consumption
216 217 218
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
219 220 221
    return NULL;
  }

222
  for (int32_t j = 0; j < numOfTables; ++j) {
223
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1};
224 225 226
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
227 228
      }

229
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
230
    } else {
231
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
232
    }
wmmhello's avatar
wmmhello 已提交
233

234 235 236
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
237 238
  }

239 240
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
241

242
  return pTableMap;
H
Hongze Cheng 已提交
243
}
H
Hongze Cheng 已提交
244

245 246 247
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
248
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
249 250
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
251
    if (p->iter.iter != NULL) {
252
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
253 254
    }

255
    p->delSkyline = taosArrayDestroy(p->delSkyline);
256 257 258
  }
}

259 260 261 262 263 264 265 266
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
267
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
268 269 270
    }

    if (p->iiter.iter != NULL) {
271
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
272 273
    }

274 275
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
276
    tMapDataClear(&p->mapData);
277 278 279 280 281
  }

  taosHashCleanup(pTableMap);
}

282
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
283 284
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
285
}
H
Hongze Cheng 已提交
286

287 288 289
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
290
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
291

292
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
293
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
294

dengyihao's avatar
dengyihao 已提交
295
  STimeWindow win = *pWindow;
296 297 298 299 300 301
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
302

H
Haojun Liao 已提交
303
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
304 305 306 307 308 309
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
310 311 312
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
313 314 315 316
  }
}

// init file iterator
317
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) {
H
Hongze Cheng 已提交
318
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
319

320 321
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
322
  pIter->pFileList = aDFileSet;
323
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
324

325 326 327 328
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
329
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
330 331 332
      return code;
    }

333 334 335 336 337
    SLastBlockReader* pLReader = pIter->pLastBlockReader;
    pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
    pLReader->order   = pReader->order;
    pLReader->window  = pReader->window;
    pLReader->verRange = pReader->verRange;
338 339
  }

340
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
341 342 343
  return TSDB_CODE_SUCCESS;
}

344
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
345 346
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
347 348 349
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
350 351 352 353 354
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
355

356
  while (1) {
H
Haojun Liao 已提交
357 358 359
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
360

361
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
362

363 364 365 366
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
367

368 369
    pReader->cost.headFileLoad += 1;

370 371 372 373 374 375 376 377 378 379 380 381
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
382 383 384
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
385 386
      continue;
    }
C
Cary Xu 已提交
387

388
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
389
              pReader->window.ekey, pReader->idStr);
390 391
    return true;
  }
392

393
_err:
H
Haojun Liao 已提交
394 395 396
  return false;
}

397
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
398 399
  pIter->order = order;
  pIter->index = -1;
400
  pIter->numOfBlocks = -1;
401 402 403 404 405
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
406
  pIter->pTableMap = pTableMap;
407 408
}

L
Liu Jicong 已提交
409
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
410

H
Haojun Liao 已提交
411
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
412 413
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
414 415
}

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

439 440
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
441
  int32_t      code = 0;
442
  int8_t       level = 0;
H
Haojun Liao 已提交
443
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
444 445
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
446
    goto _end;
H
Hongze Cheng 已提交
447 448
  }

C
Cary Xu 已提交
449 450 451 452
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
453
  initReaderStatus(&pReader->status);
454

L
Liu Jicong 已提交
455
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
456 457
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
458
  pReader->capacity = 1;
dengyihao's avatar
dengyihao 已提交
459 460
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
461
  pReader->type = pCond->type;
462
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
463

464
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
465

466
  limitOutputBufferSize(pCond, &pReader->capacity);
467

468 469
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
470
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
471
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
472
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
473 474 475
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
476

477 478
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
479
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
480 481 482 483 484
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

485 486 487 488
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
489
  }
H
Hongze Cheng 已提交
490

491 492
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
493 494
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
495

H
Haojun Liao 已提交
496 497
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
498 499 500
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
534
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561 562 563
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
564

H
Hongze Cheng 已提交
565
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567 568 569 570 571 572 573 574 575 576 577
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579 580
//   return midSlot;
// }
H
Hongze Cheng 已提交
581

H
Haojun Liao 已提交
582
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
583
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
584

585
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
586
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
587
  if (code != TSDB_CODE_SUCCESS) {
588
    goto _end;
H
Haojun Liao 已提交
589
  }
H
Hongze Cheng 已提交
590

591 592
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Hongze Cheng 已提交
593
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
594 595
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
596

597 598 599 600
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
601
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
602

603
    // uid check
H
Hongze Cheng 已提交
604
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
605 606 607 608
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
609
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
610 611 612 613 614 615
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
616
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
617 618
    }

H
Hongze Cheng 已提交
619
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
620
  }
H
Hongze Cheng 已提交
621

622
  int64_t et2 = taosGetTimestampUs();
623
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
624
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
625 626 627

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

628
_end:
H
Hongze Cheng 已提交
629
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
630 631
  return code;
}
H
Hongze Cheng 已提交
632

633
static void cleanupTableScanInfo(SHashObj* pTableMap) {
634
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
635
  while (1) {
636
    px = taosHashIterate(pTableMap, px);
637 638 639 640
    if (px == NULL) {
      break;
    }

641 642
    // reset the index in last block when handing a new file
    px->indexInBlockL = -1;
643
    tMapDataClear(&px->mapData);
644 645
    taosArrayClear(px->pBlockList);
  }
646 647 648 649 650 651 652 653 654 655
}

static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
                               SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
656

dengyihao's avatar
dengyihao 已提交
657
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
658
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
659

660
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
661

662
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
663
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
664

665
    sizeInDisk += pScanInfo->mapData.nData;
666
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
667
      SBlock block = {0};
668
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
669

670
      // 1. time range check
671
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
672 673
        continue;
      }
H
Hongze Cheng 已提交
674

675
      // 2. version range check
H
Hongze Cheng 已提交
676
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
677 678
        continue;
      }
679

680
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
681
      if (p == NULL) {
682
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
683 684
        return TSDB_CODE_OUT_OF_MEMORY;
      }
685

686
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
687
    }
H
Hongze Cheng 已提交
688

H
Haojun Liao 已提交
689
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
      numOfQTable += 1;
    }
  }

  size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
  for(int32_t i = 0; i < numOfLast; ++i) {
    SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
    if (pLastBlock->suid != pReader->suid) {
      continue;
    }

    {
      // 1. time range check, todo add later
//      if (pLastBlock->.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
//        continue;
//      }

      // 2. version range check
      if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
        continue;
      }

      pBlockNum->numOfLastBlocks += 1;
713
      taosArrayPush(pQualifiedLastBlock, pLastBlock);
H
Haojun Liao 已提交
714 715
    }
  }
H
Hongze Cheng 已提交
716

717 718
  int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;

719
  double el = (taosGetTimestampUs() - st) / 1000.0;
720
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
721 722
            numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
            / 1000.0, el, pReader->idStr);
723

724
  pReader->cost.numOfBlocks += total;
725
  pReader->cost.headFileLoadTime += el;
726

H
Haojun Liao 已提交
727 728
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
729

730 731
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
732
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
733

734
  pDumpInfo->allDumped = true;
735
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
736 737
}

738 739
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
740
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
741
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
742 743 744
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
745
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
746 747 748 749
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
750
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
751
  }
H
Haojun Liao 已提交
752 753
}

754
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
755 756
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
757 758
    return NULL;
  }
759 760 761

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
762 763
}

764
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
765

766
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
767
  SReaderStatus*  pStatus = &pReader->status;
768
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
769

770
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
771
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
772
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
773
  SSDataBlock*        pResBlock = pReader->pResBlock;
774
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
775

H
Haojun Liao 已提交
776
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
777
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
778

H
Haojun Liao 已提交
779
  SColVal cv = {0};
780
  int64_t st = taosGetTimestampUs();
781 782
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
783

784
  int32_t rowIndex = 0;
785 786
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

787 788 789 790 791 792 793 794
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

795
  int32_t          i = 0;
796 797
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
798
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
799 800 801 802 803
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

804 805 806
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
807 808 809
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
810
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
811 812

    if (pData->cid == pColData->info.colId) {
813
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
814 815
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
816
      }
817
      colIndex += 1;
818
      ASSERT(rowIndex == remain);
819 820
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
821
    }
822 823 824 825

    i += 1;
  }

826
  while (i < numOfOutputCols) {
827 828 829
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
830
  }
H
Haojun Liao 已提交
831

832
  pResBlock->info.rows = remain;
833
  pDumpInfo->rowIndex += step * remain;
834 835

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
836

837
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
838
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
839

840
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
841
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
842
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
843
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
844
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
845 846 847 848

  return TSDB_CODE_SUCCESS;
}

849
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
850
  int64_t st = taosGetTimestampUs();
851 852
  double elapsedTime = 0;
  int32_t code = 0;
853

854
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
855
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
856

857
  if (pBlockInfo != NULL) {
858 859 860
    SBlock* pBlock = getCurrentBlock(pBlockIter);
    code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
    if (code != TSDB_CODE_SUCCESS) {
861 862 863 864
      tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                ", rows:%d, code:%s %s",
                pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
                tstrerror(code), pReader->idStr);
865 866
      goto _error;
    }
867

868
    elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
869

870 871
    tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
872
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
873 874
              pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
  } else {
875 876 877 878 879
#if 0
    SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

    uint64_t uid = pBlockInfo->uid;
    SArray*  pBlocks = pLastBlockReader->pBlockL;
880

881 882 883 884 885 886 887 888 889 890 891 892
    pLastBlockReader->currentBlockIndex = -1;

    // find the correct SBlockL
    for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
      SBlockL* pBlock = taosArrayGet(pBlocks, i);
      if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
        pLastBlockReader->currentBlockIndex = i;
        break;
      }
    }

//    SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
893
    code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
894 895 896 897 898 899 900 901 902 903 904 905 906
    if (code != TSDB_CODE_SUCCESS) {
      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
                pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
      goto _error;
    }

    tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
              pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
              pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
907 908 909 910
  }

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
911

H
Haojun Liao 已提交
912
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
913 914

_error:
H
Haojun Liao 已提交
915
  return code;
H
Haojun Liao 已提交
916
}
H
Hongze Cheng 已提交
917

H
Haojun Liao 已提交
918 919 920
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
921

H
Haojun Liao 已提交
922 923 924 925
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
926

H
Haojun Liao 已提交
927 928
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
929

H
Haojun Liao 已提交
930 931
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
932

H
Haojun Liao 已提交
933
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
934 935
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
936

H
Haojun Liao 已提交
937 938 939 940
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
941

H
Haojun Liao 已提交
942 943
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
944

H
Haojun Liao 已提交
945
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
946
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
947
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
948

H
Haojun Liao 已提交
949
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
950

H
Haojun Liao 已提交
951 952
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
953

H
Haojun Liao 已提交
954 955 956 957 958 959 960
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
961

962
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
963
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
964

965 966 967 968
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
969
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
970 971 972 973 974
  if (pFBlock != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
  }
975 976 977 978 979 980

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
981
}
H
Hongze Cheng 已提交
982

983
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
984
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
985

986
  pBlockIter->numOfBlocks = numOfBlocks;
987 988
  taosArrayClear(pBlockIter->blockList);

989 990
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
991

992
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
993

994
  SBlockOrderSupporter sup = {0};
995
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
996 997 998
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
999

1000 1001 1002 1003 1004 1005 1006
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1007

1008 1009 1010 1011
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1012

1013 1014
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1015

1016 1017 1018 1019 1020
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1021

1022
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1023
    SBlock block = {0};
1024 1025
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1026 1027 1028 1029

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

1030
      wrapper.uid = pTableScanInfo->uid;
1031
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1032

1033 1034 1035 1036 1037 1038
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1039

1040
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1041

1042
  // since there is only one table qualified, blocks are not sorted
1043 1044
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1045 1046
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1047
    }
1048

1049
    int64_t et = taosGetTimestampUs();
1050
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1051
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1052

1053
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1054
    cleanupBlockOrderSupporter(&sup);
1055
    doSetCurrentBlock(pBlockIter);
1056
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1057
  }
H
Haojun Liao 已提交
1058

1059 1060
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1061

1062
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1063

1064 1065 1066 1067 1068
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1069
  }
H
Haojun Liao 已提交
1070

1071 1072 1073 1074
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1075

1076 1077
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1078

1079 1080 1081 1082
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1083

1084 1085
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1086
  }
H
Haojun Liao 已提交
1087

1088
  int64_t et = taosGetTimestampUs();
1089
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0,
1090
            pReader->idStr);
1091 1092
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1093

1094
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
1095 1096
  doSetCurrentBlock(pBlockIter);

1097
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1098
}
H
Hongze Cheng 已提交
1099

H
Haojun Liao 已提交
1100
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1101 1102
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1103
  int32_t step = asc ? 1 : -1;
1104
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1105 1106 1107
    return false;
  }

1108
  pBlockIter->index += step;
1109 1110
  doSetCurrentBlock(pBlockIter);

1111 1112 1113
  return true;
}

1114 1115 1116
/**
 * This is an two rectangles overlap cases.
 */
1117
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1118 1119
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1120 1121
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1122
}
H
Hongze Cheng 已提交
1123

1124 1125
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1126 1127 1128
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1129 1130
  }

1131
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1132 1133 1134
    return NULL;
  }

1135
  int32_t step = asc ? 1 : -1;
1136
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1137

1138
  SBlock*  pBlock = taosMemoryCalloc(1, sizeof(SBlock));
1139 1140 1141 1142
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1143 1144 1145 1146 1147
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1148
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1149 1150
  int32_t index = pBlockIter->index;

1151
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1164
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1165
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1166 1167 1168 1169
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1170 1171 1172 1173 1174
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1175

1176 1177 1178
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1179

1180
  doSetCurrentBlock(pBlockIter);
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1191
}
H
Hongze Cheng 已提交
1192

1193
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1194
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1195

1196
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1197
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1198
}
H
Hongze Cheng 已提交
1199

H
Haojun Liao 已提交
1200
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1201 1202
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1203 1204
}

1205 1206 1207 1208 1209 1210
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1211
      if (p->version >= pBlock->minVer) {
1212 1213 1214
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1215
      if (p->version >= pBlock->minVer) {
1216 1217 1218 1219 1220 1221 1222
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1223
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1239
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1240 1241 1242 1243
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1244
  // ts is not overlap
1245
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1246
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1247 1248 1249 1250 1251
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1252 1253 1254 1255
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1256
    while (1) {
1257 1258 1259 1260 1261
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1262 1263 1264
      }
    }

1265 1266
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1267 1268
}

1269 1270 1271 1272
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1273
// 5. delete info should not overlap with current block data
1274
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
1275
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
1276 1277 1278
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1279
  // overlap with neighbor
1280 1281 1282
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1283
    taosMemoryFree(pNeighbor);
1284 1285
  }

1286
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1287 1288
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1289

1290 1291
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
  bool overlapWithlastBlock = false;
1292
  if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) {
1293 1294 1295 1296
    SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
    overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
  }

1297
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1298 1299
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) ||
          overlapWithDel || overlapWithlastBlock);
H
Haojun Liao 已提交
1300 1301
}

1302
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1303
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1304 1305
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1306

1307 1308 1309
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1310
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1311

1312
  blockDataUpdateTsWindow(pBlock, 0);
1313
  pBlock->info.uid = pBlockScanInfo->uid;
1314

1315
  setComposedBlockFlag(pReader, true);
1316

1317
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1318
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1319 1320 1321
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1322 1323

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1324 1325 1326
  return code;
}

1327 1328
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1329 1330 1331 1332 1333
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1334
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1335 1336

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1337
    if (nextKey != key) {  // merge is not needed
1338
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1339 1340 1341 1342 1343 1344 1345 1346
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
1347 1348 1349 1350 1351 1352
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1353
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1372 1373 1374 1375 1376 1377 1378
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1379 1380 1381 1382
  int64_t tsLast = INT64_MIN;
  if (pLastBlockReader->lastBlockData.nRow > 0) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1383 1384 1385 1386 1387 1388

  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;

1389 1390 1391 1392 1393 1394
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
    minKey = INT64_MAX;   // chosen the minimum value
    if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }
1395

1396 1397 1398
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1399

1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
    if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1416 1417 1418 1419
  }

  bool init = false;

1420 1421 1422
  // file block ---> last block -----> imem -----> mem
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1423
      init = true;
1424 1425
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1426 1427
    }

1428 1429 1430 1431 1432 1433 1434 1435
    if (minKey == tsLast) {
      if (!init) {
        init = true;
        TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
      doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
    }
1436

1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447
    if (minKey == k.ts) {
      if (!init) {
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1448 1449
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1450
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1451 1452
    }

1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
    if (minKey == tsLast) {
      if (!init) {
        init = true;
        TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
      doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
    }

    if (minKey == key) {
      if (!init) {
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1479
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1480
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
1481
  SRowMerger          merge = {0};
H
Haojun Liao 已提交
1482
  STSRow*             pTSRow = NULL;
1483 1484 1485
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1486 1487 1488 1489
  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
  SArray*  pDelList = pBlockScanInfo->delSkyline;
  bool     freeTSRow = false;
H
Haojun Liao 已提交
1490
  uint64_t uid = pBlockScanInfo->uid;
1491

1492 1493 1494
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
1495 1496 1497 1498 1499 1500 1501
      // imem & mem are all empty, only file exist
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1502
        freeTSRow = true;
1503
      }
1504
    } else if (k.ts < key) {  // k.ts < key
1505
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1506 1507 1508
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1509 1510

      tRowMerge(&merge, pRow);
1511
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1512 1513

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1514
      freeTSRow = true;
1515
    }
1516 1517
  } else {  // descending order scan
    if (key < k.ts) {
1518
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1519
    } else if (k.ts < key) {
1520 1521 1522 1523 1524 1525
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1526
        freeTSRow = true;
1527
      }
1528
    } else {  // descending order: mem rows -----> imem rows ------> file block
H
Haojun Liao 已提交
1529
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1530

H
Haojun Liao 已提交
1531
      tRowMergerInit(&merge, pRow, pSchema);
1532
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1533 1534 1535 1536 1537

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1538
      freeTSRow = true;
1539
    }
1540 1541
  }

1542
  tRowMergerClear(&merge);
H
Haojun Liao 已提交
1543
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
H
Haojun Liao 已提交
1544

H
Haojun Liao 已提交
1545 1546 1547
  if (freeTSRow) {
    taosMemoryFree(pTSRow);
  }
H
Haojun Liao 已提交
1548

1549 1550 1551
  return TSDB_CODE_SUCCESS;
}

1552
// todo handle the desc order check
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
  ASSERT(pRow != NULL && piRow != NULL);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
  int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

  int64_t minKey = INT64_MAX;
  if (minKey > k.ts) {
    minKey = k.ts;
  }

  if (minKey > ik.ts) {
    minKey = ik.ts;
  }

  if (minKey > key) {
    minKey = key;
  }

  if (minKey > tsLast) {
    minKey = tsLast;
  }

  // file block ---> last block -----> imem -----> mem
  bool init = false;
  if (minKey == key) {
    init = true;
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
  }

  if (minKey == tsLast) {
    if (!init) {
      init = true;
1601
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
      tRowMergerInit(&merge, &fRow1, pReader->pSchema);
    }
    doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
  }

  if (minKey == ik.ts) {
    if (!init) {
      init = true;
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, piRow, pSchema);
    }
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
  }

  if (minKey == k.ts) {
    if (!init) {
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
    }
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1632
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1633 1634 1635
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1636
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1637
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1638

1639 1640
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1641
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1642

1643
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1644
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1645

1646
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1647

1648 1649 1650
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1651 1652
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1653 1654 1655
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1656
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1657

1658 1659
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1660
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1661 1662
      }

1663 1664
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1665
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1666 1667 1668
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1669
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1670
      return TSDB_CODE_SUCCESS;
1671
    } else {  // key > ik.ts || key > k.ts
1672 1673
      ASSERT(key != ik.ts);

1674
      // [3] ik.ts < key <= k.ts
1675
      // [4] ik.ts < k.ts <= key
1676
      if (ik.ts < k.ts) {
1677
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1678
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1679 1680 1681
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1682 1683 1684
        return TSDB_CODE_SUCCESS;
      }

1685 1686
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1687
      if (k.ts < ik.ts) {
1688
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1689
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1690 1691 1692
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1693 1694 1695
        return TSDB_CODE_SUCCESS;
      }

1696
      // [7] k.ts == ik.ts < key
1697
      if (k.ts == ik.ts) {
1698 1699
        ASSERT(key > ik.ts && key > k.ts);

1700
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1701
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1702
        taosMemoryFree(pTSRow);
1703 1704 1705
        return TSDB_CODE_SUCCESS;
      }
    }
1706 1707 1708
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1709
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1710

H
Haojun Liao 已提交
1711
      tRowMergerInit(&merge, pRow, pSchema);
1712
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1713 1714 1715

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1716
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1717 1718 1719 1720 1721 1722 1723 1724 1725
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1726
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1727 1728
      return TSDB_CODE_SUCCESS;
    } else {
1729
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1730 1731 1732 1733

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1734
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1735
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1736 1737 1738
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1750
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1751
        taosMemoryFree(pTSRow);
1752 1753 1754 1755 1756
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1757
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1758 1759 1760 1761 1762

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1763
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1764 1765

        taosMemoryFree(pTSRow);
1766 1767 1768 1769 1770 1771
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1772
  return -1;
1773 1774
}

dengyihao's avatar
dengyihao 已提交
1775 1776
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1777 1778 1779 1780 1781 1782 1783 1784
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1796
  TSDBKEY k = {.ts = ts, .version = ver};
1797
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1798 1799 1800
    return false;
  }

1801 1802 1803
  return true;
}

1804
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1805

1806
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) {
1807 1808
  pLastBlockReader->uid = uid;
  pLastBlockReader->rowIndex = startPos;
1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821

  if (*startPos == -1) {
    if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
      // do nothing
    } else {
      *startPos = pLastBlockReader->lastBlockData.nRow;
    }
  }
}

#define ALL_ROWS_CHECKED_INDEX INT16_MIN
static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
  *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
1822 1823 1824
}

static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
1825 1826
  int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1827 1828 1829
    return false;
  }

1830
  *(pLastBlockReader->rowIndex) += step;
1831 1832

  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1833
  for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
    if (pBlockData->aUid[i] != pLastBlockReader->uid) {
      continue;
    }

    if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) {
      continue;
    }

    if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) {
      continue;
    }

    // no data any more
    if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
1848
      setAllRowsChecked(pLastBlockReader);
1849 1850 1851 1852
      return false;
    }

    if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
1853
      setAllRowsChecked(pLastBlockReader);
1854 1855 1856
      return false;
    }

1857
    *(pLastBlockReader->rowIndex) = i;
1858 1859 1860
    return true;
  }

1861
  // set all data is consumed in last block
1862
  setAllRowsChecked(pLastBlockReader);
1863 1864 1865
  return false;
}

1866
#if 0
1867 1868 1869 1870 1871 1872 1873
static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
  return pLastBlockReader->rowIndex;
}

static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) {
  pLastBlockReader->rowIndex = state;
}
1874
#endif
1875 1876 1877

static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1878
  return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
1879 1880 1881
}

static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
1882
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1883 1884 1885 1886 1887
    return false;
  }
  return true;
}

1888
// todo refactor
1889
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
1890 1891
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1892
  int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
1893 1894
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1895

1896
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1897
    return doMergeThreeLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1898
  } else {
1899
    // imem + file + last block
1900
    if (pBlockScanInfo->iiter.hasVal) {
1901
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
1902 1903
    }

1904
    // mem + file
1905
    if (pBlockScanInfo->iter.hasVal) {
1906
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
1907
    }
1908

1909
    if (pBlockData->nRow > 0) {
1910
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1911

1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930
      // no last block
      if (pLastBlockReader->lastBlockData.nRow == 0) {
        if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
          return TSDB_CODE_SUCCESS;
        } else {
          STSRow*    pTSRow = NULL;
          SRowMerger merge = {0};

          tRowMergerInit(&merge, &fRow, pReader->pSchema);
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
          tRowMergerGetRow(&merge, &pTSRow);
          doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

          taosMemoryFree(pTSRow);
          tRowMergerClear(&merge);
          return TSDB_CODE_SUCCESS;
        }
      }

1931 1932 1933 1934 1935 1936 1937 1938
      // row in last file block
      int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
      if (ts < key) {  // save rows in last block
        SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;

        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};

1939
        TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
1940 1941

        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
1942
        doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);
1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985
        tRowMergerGetRow(&merge, &pTSRow);

        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
        return TSDB_CODE_SUCCESS;
      } else if (ts == key) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};

        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);

        tRowMergerGetRow(&merge, &pTSRow);
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
        return TSDB_CODE_SUCCESS;
      } else {  // ts > key, asc; todo handle desc
        // imem & mem are all empty, only file exist
        if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
          return TSDB_CODE_SUCCESS;
        } else {
          STSRow*    pTSRow = NULL;
          SRowMerger merge = {0};

          tRowMergerInit(&merge, &fRow, pReader->pSchema);
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
          tRowMergerGetRow(&merge, &pTSRow);
          doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

          taosMemoryFree(pTSRow);
          tRowMergerClear(&merge);
          return TSDB_CODE_SUCCESS;
        }
      }
    } else {  // only last block exists
      SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
      int64_t     tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);

1986 1987
      STSRow*    pTSRow = NULL;
      SRowMerger merge = {0};
H
Haojun Liao 已提交
1988

1989
      TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
1990

1991
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1992
      doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
1993
      tRowMergerGetRow(&merge, &pTSRow);
1994

H
Haojun Liao 已提交
1995
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1996

1997 1998 1999 2000
      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
      return TSDB_CODE_SUCCESS;
    }
2001 2002 2003
  }
}

2004
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
2005 2006
  SSDataBlock* pResBlock = pReader->pResBlock;

2007
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2008 2009 2010 2011 2012 2013 2014 2015

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

2016
  SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2017
  initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
2018
//  bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
2019

2020
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2021 2022
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2023

2024 2025
  int64_t st = taosGetTimestampUs();

2026
  while (1) {
2027 2028
    // todo check the validate of row in file block
    {
2029
      if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
2030 2031
        pDumpInfo->rowIndex += step;

2032
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2033 2034 2035 2036 2037 2038 2039
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
2040 2041 2042 2043

      if (!hasDataInLastBlock(pLastBlockReader)) {
        break;
      }
2044 2045
    }

2046
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2047

2048
    // currently loaded file data block is consumed
2049
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
2050
      SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2051 2052 2053 2054 2055 2056
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2057 2058 2059 2060
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2061 2062
  blockDataUpdateTsWindow(pResBlock, 0);

2063
  setComposedBlockFlag(pReader, true);
2064
  int64_t et = taosGetTimestampUs();
2065

2066
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2067
            " rows:%d, elapsed time:%.2f ms %s",
2068
            pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
2069
            pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
2070

2071 2072 2073 2074 2075
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2076
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2077 2078 2079 2080
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

2081
  int32_t code = TSDB_CODE_SUCCESS;
2082 2083 2084 2085 2086 2087 2088 2089 2090

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
2091 2092

  STbData* d = NULL;
H
Hongze Cheng 已提交
2093
  if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
2094
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
2095
    if (d != NULL) {
2096
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
2097
      if (code == TSDB_CODE_SUCCESS) {
2098
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
2099

H
Haojun Liao 已提交
2100
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2101 2102
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
2103
      } else {
2104 2105
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2106
        return code;
2107 2108
      }
    }
H
Haojun Liao 已提交
2109
  } else {
2110
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2111 2112
  }

2113
  STbData* di = NULL;
H
Hongze Cheng 已提交
2114
  if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
2115
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
2116
    if (di != NULL) {
2117
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
2118
      if (code == TSDB_CODE_SUCCESS) {
2119
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
2120

H
Haojun Liao 已提交
2121
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2122
                  "-%" PRId64 " %s",
2123
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
2124
      } else {
2125 2126
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2127
        return code;
2128 2129
      }
    }
H
Haojun Liao 已提交
2130 2131
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2132 2133
  }

2134 2135
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

2136
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
2137 2138 2139
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2140 2141
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2142 2143 2144
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2145

2146 2147 2148
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2149 2150
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2151
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2152 2153
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2154
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2155
    if (code != TSDB_CODE_SUCCESS) {
2156 2157 2158 2159 2160
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2161
      tsdbDelFReaderClose(&pDelFReader);
2162 2163 2164
      goto _err;
    }

H
Hongze Cheng 已提交
2165
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2166 2167 2168
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2169 2170
      goto _err;
    }
2171

2172 2173 2174
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2175
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2176
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2177 2178 2179 2180 2181 2182 2183
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2184
    }
2185
  }
2186

2187 2188 2189 2190 2191 2192 2193
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2194 2195
  }

2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2210 2211
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2212 2213
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2214 2215
  return code;

2216 2217 2218
_err:
  taosArrayDestroy(pDelData);
  return code;
2219 2220
}

2221 2222 2223
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

2224
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2225 2226
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

2227 2228
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2229
  if (pRow != NULL) {
2230 2231 2232
    key = TSDBROW_KEY(pRow);
  }

2233
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2234
  if (pRow != NULL) {
2235 2236 2237 2238 2239 2240 2241 2242 2243
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2244
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2245
  SReaderStatus* pStatus = &pReader->status;
2246

2247
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2248
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2249 2250

  while (1) {
2251
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2252
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2253 2254 2255
      break;
    }

H
Haojun Liao 已提交
2256
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2257 2258
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2259
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2260 2261 2262
      return code;
    }

2263 2264
    SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
    code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
2265 2266 2267 2268 2269
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(pIndexList);
      return code;
    }

2270 2271 2272 2273
    if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
      SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));

      code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
H
Haojun Liao 已提交
2274
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2275
        taosArrayDestroy(pIndexList);
2276
        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2277 2278 2279
        return code;
      }

2280 2281
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
        ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
2282 2283 2284 2285
        taosArrayClear(pLastBlocks);
        taosArrayAddAll(pLastBlocks, pQLastBlock);

        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2286 2287 2288
        break;
      }
    }
2289

H
Haojun Liao 已提交
2290 2291 2292
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2293
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2294 2295 2296
  return TSDB_CODE_SUCCESS;
}

2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64_t uid, STsdbReader* pReader) {
  SArray*  pBlocks = pLastBlockReader->pBlockL;
  SBlockL* pBlock = NULL;

  pLastBlockReader->currentBlockIndex = -1;

  // find the correct SBlockL
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
    SBlockL* p = taosArrayGet(pBlocks, i);
    if (p->minUid <= uid && p->maxUid >= uid) {
      pLastBlockReader->currentBlockIndex = i;
      pBlock = p;
      break;
    }
  }

  if (pLastBlockReader->currentBlockIndex == -1) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = tBlockDataCreate(&pLastBlockReader->lastBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
    //todo add log
    return code;
  }

  code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    //      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
    //                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
    //                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow,
    //                pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr);
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2340
  SReaderStatus* pStatus = &pReader->status;
2341 2342 2343 2344 2345 2346 2347 2348
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

  while(1) {
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
2349 2350 2351 2352 2353 2354
    }

    // load the last data block of current table
    // todo opt perf by avoiding load last block repeatly
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
    int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
2355
    if (code != TSDB_CODE_SUCCESS) {
2356 2357 2358
      return code;
    }

2359 2360
    initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
    if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
2361 2362 2363 2364 2365 2366 2367
      bool hasData = nextRowInLastBlock(pLastBlockReader);
      if (!hasData) {  // current table does not have rows in last block, try next table
        pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
        if (pStatus->pTableIter == NULL) {
          return TSDB_CODE_SUCCESS;
        }
        continue;
2368 2369 2370
      }
    }

2371 2372 2373 2374 2375 2376 2377 2378
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2379

2380 2381 2382 2383 2384
    // current table is exhausted, let's try next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
      return TSDB_CODE_SUCCESS;
    }
2385 2386 2387
  }
}

2388 2389 2390
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

2391
  SReaderStatus*  pStatus = &pReader->status;
2392 2393
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

2394
  TSDBKEY key = {0};
2395
  SBlock* pBlock = NULL;
2396 2397 2398
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2399

2400 2401
  if (pBlockInfo != NULL) {
    pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2402 2403
    pBlock = getCurrentBlock(pBlockIter);
    key = getCurrentKeyInBuf(pBlockIter, pReader);
2404

2405
    // load the last data block of current table
2406 2407
    code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
    if (code != TSDB_CODE_SUCCESS) {
2408
      return code;
2409 2410
    }

2411
    initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL);
2412
  }
2413

2414 2415 2416 2417 2418 2419 2420
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2421
      return code;
2422
    }
2423

2424 2425 2426
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2427 2428 2429
    }

    // build composed data block
2430
    code = buildComposedDataBlock(pReader);
2431 2432
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2433
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2434
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2435
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2436
  } else {  // whole block is required, return it directly
2437
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
2438 2439 2440
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
2441
    setComposedBlockFlag(pReader, false);
2442
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
2443 2444 2445 2446 2447
  }

  return code;
}

H
Haojun Liao 已提交
2448
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2449 2450
  SReaderStatus* pStatus = &pReader->status;

2451
  while (1) {
2452 2453 2454
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2455
        return TSDB_CODE_SUCCESS;
2456 2457 2458 2459
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2460
    initMemDataIterator(pBlockScanInfo, pReader);
2461

2462
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2463
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2464 2465 2466 2467
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2468
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2469
      return TSDB_CODE_SUCCESS;
2470 2471 2472 2473 2474
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2475
      return TSDB_CODE_SUCCESS;
2476 2477 2478 2479
    }
  }
}

2480
// set the correct start position in case of the first/last file block, according to the query time window
2481
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2482
  SBlock* pBlock = getCurrentBlock(pBlockIter);
2483

2484 2485 2486
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2487 2488 2489

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2490
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2491 2492
}

2493
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2494 2495
  SBlockNumber num = {0};

2496
  int32_t code = moveToNextFile(pReader, &num);
2497 2498 2499 2500 2501
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2502
  if (num.numOfBlocks + num.numOfLastBlocks == 0) {
2503 2504 2505 2506 2507
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2508 2509 2510 2511 2512
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
  } else {
    pBlockIter->numOfBlocks = 0;
  }
2513 2514

  // set the correct start position according to the query time window
2515
  initBlockDumpInfo(pReader, pBlockIter);
2516 2517 2518
  return code;
}

2519
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2520 2521
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2522 2523
}

2524
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2525
  int32_t code = TSDB_CODE_SUCCESS;
2526 2527
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2528 2529
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2530 2531 2532 2533 2534 2535 2536
  if (pBlockIter->numOfBlocks == 0) {
    _begin:
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2537 2538 2539 2540
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2541
    // all data blocks are checked in this last block file, now let's try the next file
2542 2543 2544 2545 2546 2547 2548 2549
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2550
      // this file does not have data files, let's start check the last block file if exists
2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2566
  while (1) {
2567 2568
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2569
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2570
      code = buildComposedDataBlock(pReader);
2571 2572 2573 2574 2575 2576 2577
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2578 2579 2580 2581 2582 2583 2584
        } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {  // data blocks in current file are exhausted, let's try the next file now
          // todo dump all data in last block if exists.
          pBlockIter->numOfBlocks = 0;
          taosArrayClear(pBlockIter->blockList);
          tBlockDataReset(&pReader->status.fileBlockData);
          goto _begin;
        } else {
2585 2586 2587 2588 2589 2590
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2591 2592 2593 2594 2595

          // this file does not have blocks, let's start check the last block file
          if (pBlockIter->numOfBlocks == 0) {
            goto _begin;
          }
2596
        }
H
Haojun Liao 已提交
2597
      }
2598 2599

      code = doBuildDataBlock(pReader);
2600 2601
    }

2602 2603 2604 2605 2606 2607 2608 2609
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2610
}
H
refact  
Hongze Cheng 已提交
2611

2612 2613
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2614
  if (VND_IS_RSMA(pVnode)) {
2615
    int8_t  level = 0;
2616 2617
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2618
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2632
    const char* str = (idStr != NULL) ? idStr : "";
2633 2634

    if (level == TSDB_RETENTION_L0) {
2635
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2636
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2637 2638
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2639
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2640
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2641 2642
      return VND_RSMA1(pVnode);
    } else {
2643
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2644
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2645 2646 2647 2648 2649 2650 2651
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2652
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2653
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2654 2655

  int64_t endVer = 0;
L
Liu Jicong 已提交
2656 2657
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2658 2659
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2660
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2661 2662
  }

H
Haojun Liao 已提交
2663
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2664 2665
}

H
Hongze Cheng 已提交
2666 2667 2668 2669
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
2670

H
Hongze Cheng 已提交
2671 2672
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
2673

H
Hongze Cheng 已提交
2674 2675
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
2676
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
2677

H
Hongze Cheng 已提交
2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
2692
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
2693 2694 2695 2696 2697 2698
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

2699
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2700 2701 2702 2703
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2704 2705 2706
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2707

2708 2709 2710 2711 2712 2713
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2714
        return false;
2715 2716 2717
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2718 2719
      }
    } else {
2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2750 2751
    }
  } else {
2752 2753
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2754

2755 2756 2757 2758 2759 2760 2761
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2762
    } else {
2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2790 2791 2792 2793 2794
      }

      return false;
    }
  }
2795 2796

  return false;
2797 2798 2799 2800
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2801 2802
    return NULL;
  }
H
Hongze Cheng 已提交
2803

2804
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2805
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2806
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2807
    pIter->hasVal = false;
H
Haojun Liao 已提交
2808 2809
    return NULL;
  }
H
Hongze Cheng 已提交
2810

2811
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2812
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2813
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2814 2815
    return pRow;
  }
H
Hongze Cheng 已提交
2816

2817
  while (1) {
2818 2819
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2820 2821
      return NULL;
    }
H
Hongze Cheng 已提交
2822

2823
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2824

H
Haojun Liao 已提交
2825
    key = TSDBROW_KEY(pRow);
2826
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2827
      pIter->hasVal = false;
H
Haojun Liao 已提交
2828 2829
      return NULL;
    }
H
Hongze Cheng 已提交
2830

dengyihao's avatar
dengyihao 已提交
2831
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2832
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2833 2834 2835 2836
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2837

2838 2839
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2840
  while (1) {
2841 2842
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2843 2844
      break;
    }
H
Hongze Cheng 已提交
2845

2846
    // data exists but not valid
2847
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2848 2849 2850 2851 2852
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2853
    TSDBKEY k = TSDBROW_KEY(pRow);
2854
    if (k.ts != ts) {
H
Haojun Liao 已提交
2855 2856 2857
      break;
    }

H
Haojun Liao 已提交
2858
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2859
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2860 2861 2862 2863 2864
  }

  return TSDB_CODE_SUCCESS;
}

2865
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2866
                                          SVersionRange* pVerRange, int32_t step) {
2867 2868
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2869
      rowIndex += step;
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2887 2888
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2889
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2890
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2891

2892
  *state = CHECK_FILEBLOCK_QUIT;
2893
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2894 2895 2896

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2897
  if (pNeighborBlock == NULL) {  // do nothing
2898 2899 2900 2901
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2902 2903
  taosMemoryFree(pNeighborBlock);

2904
  if (overlap) {  // load next block
2905
    SReaderStatus*  pStatus = &pReader->status;
2906 2907
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2908
    // 1. find the next neighbor block in the scan block list
2909
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2910
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2911

2912
    // 2. remove it from the scan block list
2913
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2914

2915
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2916
    tBlockDataReset(&pStatus->fileBlockData);
2917
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2918 2919 2920 2921
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2922
    // 4. check the data values
2923 2924 2925 2926
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2927
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2928 2929 2930 2931 2932 2933 2934
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2935 2936
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2937 2938
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2939
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2940
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2941
  int32_t step = asc ? 1 : -1;
2942

2943
  pDumpInfo->rowIndex += step;
2944
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2945 2946 2947
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2948

2949 2950 2951 2952
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2953

2954
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2955
      SBlock*             pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2956 2957 2958
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2959
      }
2960
    }
H
Haojun Liao 已提交
2961
  }
2962

H
Haojun Liao 已提交
2963 2964 2965
  return TSDB_CODE_SUCCESS;
}

2966
// todo check if the rows are dropped or not
2967 2968 2969 2970
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) {
  while(nextRowInLastBlock(pLastBlockReader)) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
2971
      TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);
2972 2973 2974 2975 2976 2977 2978 2979 2980
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2981
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
2982
                      STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
2983
  TSDBROW* pNextRow = NULL;
2984
  TSDBROW  current = *pRow;
2985

2986 2987
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
2988

2989 2990 2991 2992 2993
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
      return;
    } else {  // has next point in mem/imem
H
Haojun Liao 已提交
2994
      pNextRow = getValidRow(pIter, pDelList, pReader);
2995 2996 2997 2998 2999 3000
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }

H
Haojun Liao 已提交
3001
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3002 3003 3004 3005
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }
3006
    }
3007 3008
  }

3009 3010
  SRowMerger merge = {0};

3011
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3012
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3013

3014 3015
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3016
  }
H
Haojun Liao 已提交
3017

H
Haojun Liao 已提交
3018 3019 3020 3021 3022 3023
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
3024
  tRowMergerGetRow(&merge, pTSRow);
3025
  tRowMergerClear(&merge);
M
Minglei Jin 已提交
3026

3027
  *freeTSRow = true;
3028 3029
}

3030 3031
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
3032 3033
  SRowMerger merge = {0};

3034 3035 3036
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3037
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3038
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3039

H
Haojun Liao 已提交
3040
    tRowMergerInit(&merge, piRow, pSchema);
3041
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3042

3043
    tRowMerge(&merge, pRow);
3044
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3045
  } else {
H
Haojun Liao 已提交
3046
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3047

H
Haojun Liao 已提交
3048
    tRowMergerInit(&merge, pRow, pSchema);
3049
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3050 3051

    tRowMerge(&merge, piRow);
3052
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3053
  }
3054 3055 3056 3057

  tRowMergerGetRow(&merge, pTSRow);
}

3058 3059
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3060 3061
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3062
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
3063

3064 3065
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3066
  if (pBlockScanInfo->iter.hasVal) {
3067 3068 3069 3070 3071 3072
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3073
  if (pBlockScanInfo->iiter.hasVal) {
3074 3075 3076 3077 3078 3079
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3080
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3081
    TSDBKEY k = TSDBROW_KEY(pRow);
3082
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3083

3084
    if (ik.ts < k.ts) {  // ik.ts < k.ts
3085
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3086
    } else if (k.ts < ik.ts) {
3087
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3088 3089
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
3090
      *freeTSRow = true;
H
Haojun Liao 已提交
3091
    }
3092 3093

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3094 3095
  }

3096
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3097
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3098 3099 3100
    return TSDB_CODE_SUCCESS;
  }

3101
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3102
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3103 3104 3105 3106 3107 3108
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3109
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3110 3111 3112
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3113
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3114
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3115

3116
  SColVal colVal = {0};
3117
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3118

3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3130
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3131 3132 3133 3134 3135 3136 3137 3138
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3139
    }
3140 3141
  }

3142
  // set null value since current column does not exist in the "pSchema"
3143
  while (i < numOfCols) {
3144 3145 3146 3147 3148
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3149 3150 3151 3152
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3153
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
3154 3155 3156 3157 3158 3159 3160 3161
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3162
    i += 1;
3163 3164 3165 3166 3167 3168
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

3169
  while (i < numOfOutputCols && j < numOfInputCols) {
3170
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3171
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3172 3173

    if (pData->cid == pCol->info.colId) {
3174 3175
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3176 3177 3178 3179 3180 3181 3182 3183 3184 3185
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3186
    colDataAppendNULL(pCol, outputRowIndex);
3187 3188 3189 3190 3191 3192 3193
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3194 3195
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3196 3197 3198 3199
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3200
    bool    freeTSRow = false;
3201
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3202 3203
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3204 3205
    }

H
Haojun Liao 已提交
3206
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3207 3208 3209
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3210 3211

    // no data in buffer, return immediately
3212
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3213 3214 3215
      break;
    }

3216
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3217 3218 3219 3220
      break;
    }
  } while (1);

3221
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3222 3223
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3224

3225
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3226
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3227 3228 3229 3230 3231
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3232 3233 3234
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3235 3236 3237 3238 3239 3240
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3241

dengyihao's avatar
dengyihao 已提交
3242 3243 3244 3245 3246 3247
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3248

H
Hongze Cheng 已提交
3249
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3250

C
Cary Xu 已提交
3251

H
refact  
Hongze Cheng 已提交
3252
// ====================================== EXPOSED APIs ======================================
3253 3254
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3255 3256
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3257 3258
    goto _err;
  }
H
Hongze Cheng 已提交
3259

3260
  // check for query time window
H
Haojun Liao 已提交
3261
  STsdbReader* pReader = *ppReader;
3262
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3263 3264 3265
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3266

3267 3268 3269
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3270
    int32_t     order = pCond->order;
3271 3272 3273 3274 3275 3276 3277 3278 3279 3280
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3281
    // here we only need one more row, so the capacity is set to be ONE.
3282 3283 3284 3285 3286 3287 3288 3289
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3290
    } else {
3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

3307 3308
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3309 3310 3311
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3312

H
Haojun Liao 已提交
3313 3314 3315
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3316

H
Hongze Cheng 已提交
3317
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3318 3319 3320
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3321

3322 3323
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3324

3325
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3326
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3327 3328 3329 3330 3331 3332 3333 3334 3335 3336

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3337
  } else {
3338
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3339 3340
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3341 3342 3343 3344 3345
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3346
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3347
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
3348 3349 3350 3351 3352 3353 3354 3355 3356

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3357 3358 3359
    }
  }

3360
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3361
  return code;
H
Hongze Cheng 已提交
3362 3363

_err:
S
Shengliang Guan 已提交
3364
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3365
  return code;
H
refact  
Hongze Cheng 已提交
3366 3367 3368
}

void tsdbReaderClose(STsdbReader* pReader) {
3369 3370
  if (pReader == NULL) {
    return;
3371
  }
H
refact  
Hongze Cheng 已提交
3372

3373
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3374
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
3375

3376 3377 3378 3379
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3380
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3381 3382 3383 3384 3385
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3386
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3387 3388

  cleanupDataBlockIterator(&pReader->status.blockIter);
3389 3390

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3391
  destroyBlockScanInfo(pReader->status.pTableMap);
3392
  blockDataDestroy(pReader->pResBlock);
3393

H
Haojun Liao 已提交
3394 3395 3396
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3397

3398
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
3399

3400
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3401 3402
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3403
            "build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
3404
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
3405
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
3406
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3407

3408 3409
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3410 3411 3412
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3413
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3414 3415
}

3416
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3417
  // cleanup the data that belongs to the previous data block
3418 3419
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3420

3421
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3422

3423 3424 3425 3426 3427
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3428

3429 3430 3431
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3432
      buildBlockFromBufferSequentially(pReader);
3433
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3434
    }
3435 3436 3437
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3438
  }
3439

3440
  return false;
H
refact  
Hongze Cheng 已提交
3441 3442
}

3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3480 3481 3482 3483
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3484 3485
}

3486 3487
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3488
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3489
      setBlockInfo(pReader, pDataBlockInfo);
3490
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3491 3492 3493 3494 3495 3496 3497 3498 3499
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3500
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3501
  int32_t code = 0;
3502
  *allHave = false;
H
Hongze Cheng 已提交
3503

3504
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3505 3506 3507 3508
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3509
  // there is no statistics data for composed block
3510 3511 3512 3513
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3514

3515
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3516

3517
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
3518
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3519

3520 3521
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

3522
  if (tBlockHasSma(pBlock)) {
H
Hongze Cheng 已提交
3523
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3524
    if (code != TSDB_CODE_SUCCESS) {
3525 3526
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3527 3528
      return code;
    }
3529 3530 3531
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3532
  }
H
Hongze Cheng 已提交
3533

3534
  *allHave = true;
H
Hongze Cheng 已提交
3535

3536 3537
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3538

3539 3540
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3557 3558
      i += 1;
      j += 1;
3559 3560 3561 3562 3563 3564 3565
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3566
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3567
  pReader->cost.smaLoadTime += elapsed;
3568
  pReader->cost.smaData += 1;
3569 3570 3571

  *pBlockStatis = pSup->plist;

3572
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3573 3574
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3575
  return code;
H
Hongze Cheng 已提交
3576 3577
}

3578
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3579 3580 3581
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3582
    return pReader->pResBlock->pDataBlock;
3583
  }
3584

3585
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3586
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3587

H
Haojun Liao 已提交
3588
  tBlockDataReset(&pStatus->fileBlockData);
3589 3590
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3591 3592
    terrno = code;
    return NULL;
3593 3594 3595
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3596
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3597
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3598 3599
    terrno = code;
    return NULL;
3600
  }
3601 3602 3603

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3604 3605
}

3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3618
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3619 3620 3621
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3622

L
Liu Jicong 已提交
3623
  pReader->order = pCond->order;
3624
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3625
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3626
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3627
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3628

3629
  // allocate buffer in order to load data blocks from file
3630
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3631 3632
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3633
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3634
  tsdbDataFReaderClose(&pReader->pFileReader);
3635

3636
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3637 3638
  tsdbDataFReaderClose(&pReader->pFileReader);

3639
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3640
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3641
  resetDataBlockScanInfo(pReader->status.pTableMap);
3642

3643
  int32_t         code = 0;
3644 3645
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3646 3647 3648 3649 3650 3651
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3652 3653
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3654 3655 3656
      return code;
    }
  }
H
Hongze Cheng 已提交
3657

dengyihao's avatar
dengyihao 已提交
3658 3659
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3660

3661
  return code;
H
Hongze Cheng 已提交
3662
}
H
Hongze Cheng 已提交
3663

3664 3665 3666
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3667

3668 3669 3670 3671
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3672

3673 3674
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3675

3676 3677 3678
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3679

3680
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3681

3682
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3683

3684 3685
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3686

3687 3688
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3689

3690 3691
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3692
  }
H
Hongze Cheng 已提交
3693

3694
  pTableBlockInfo->numOfTables = numOfTables;
3695
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3696

3697 3698
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
3699
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3700

3701 3702
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3703

3704 3705 3706
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3707

3708 3709 3710
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3711

3712 3713 3714
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3715

3716 3717
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3718 3719

      hasNext = blockIteratorNext(&pStatus->blockIter);
3720 3721 3722 3723 3724
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3725

3726 3727
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3728
    }
H
refact  
Hongze Cheng 已提交
3729

H
Hongze Cheng 已提交
3730 3731
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3732
  }
H
Hongze Cheng 已提交
3733

H
refact  
Hongze Cheng 已提交
3734 3735
  return code;
}
H
Hongze Cheng 已提交
3736

H
refact  
Hongze Cheng 已提交
3737
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3738
  int64_t rows = 0;
H
Hongze Cheng 已提交
3739

3740 3741
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3742

3743 3744 3745 3746 3747
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3748
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3749 3750 3751 3752 3753 3754 3755
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3756
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3757 3758 3759 3760 3761 3762 3763 3764
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3765

H
refact  
Hongze Cheng 已提交
3766
  return rows;
H
Hongze Cheng 已提交
3767
}
D
dapan1121 已提交
3768

L
Liu Jicong 已提交
3769
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3782

D
dapan1121 已提交
3783
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3784
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3800

D
dapan1121 已提交
3801 3802
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3833
  // fs
H
Hongze Cheng 已提交
3834 3835 3836 3837 3838
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3839 3840 3841 3842 3843 3844 3845 3846

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3847
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3862
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3863
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3864
  }
H
Hongze Cheng 已提交
3865

S
Shengliang Guan 已提交
3866
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3867
}