tsdbRead.c 131.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
20 21
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define DEFAULT_ROW_INDEX_VAL  (-1)
H
Hongze Cheng 已提交
22

23 24 25 26 27 28
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

29
typedef struct {
dengyihao's avatar
dengyihao 已提交
30
  STbDataIter* iter;
31 32 33 34
  int32_t      index;
  bool         hasVal;
} SIterInfo;

35 36 37 38 39
typedef struct {
  int32_t numOfBlocks;
  int32_t numOfLastBlocks;
} SBlockNumber;

H
Haojun Liao 已提交
40
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
41 42
  uint64_t  uid;
  TSKEY     lastKey;
43 44 45 46 47
  SMapData  mapData;      // block info (compressed)
  SArray*   pBlockList;   // block data index list
  SIterInfo iter;         // mem buffer skip list iterator
  SIterInfo iiter;        // imem buffer skip list iterator
  SArray*   delSkyline;   // delete info for this table
48 49
  int32_t   fileDelIndex; // file block delete index
  int32_t   lastBlockDelIndex;// delete index for last block
50 51
  bool      iterInit;     // whether to initialize the in-memory skip list iterator or not
  int16_t   indexInBlockL;// row position in last block
H
Haojun Liao 已提交
52 53 54
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
55
  int64_t uid;
56
  int64_t offset;
H
Haojun Liao 已提交
57
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
58 59

typedef struct SBlockOrderSupporter {
60 61 62 63
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
64 65 66
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
67 68 69
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
70
  int64_t headFileLoad;
71
  double  headFileLoadTime;
72
  int64_t smaDataLoad;
73
  double  smaLoadTime;
74 75
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Hongze Cheng 已提交
76 77 78
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
79
  SArray*          pColAgg;
80
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
81
  SColumnDataAgg** plist;
82 83
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
84 85
} SBlockLoadSuppInfo;

86 87 88 89 90 91 92 93 94 95 96
typedef struct SVersionRange {
  uint64_t minVer;
  uint64_t maxVer;
} SVersionRange;

typedef struct SLastBlockReader {
  SArray*       pBlockL;
  int32_t       currentBlockIndex;
  SBlockData    lastBlockData;
  STimeWindow   window;
  SVersionRange verRange;
97
  int32_t       order;
98
  uint64_t      uid;
99
  int16_t*      rowIndex;         // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
100 101
} SLastBlockReader;

102
typedef struct SFilesetIter {
103 104 105 106
  int32_t           numOfFiles;    // number of total files
  int32_t           index;         // current accessed index in the list
  SArray*           pFileList;     // data file list
  int32_t           order;
107
  SLastBlockReader* pLastBlockReader; // last file block reader
108
} SFilesetIter;
H
Haojun Liao 已提交
109 110

typedef struct SFileDataBlockInfo {
111
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
112
  uint64_t uid;
113
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
114 115 116
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
117
  int32_t   numOfBlocks;
118
  int32_t   index;
119
  SArray*   blockList;      // SArray<SFileDataBlockInfo>
120
  int32_t   order;
121
  SBlock    block;          // current SBlock data
122
  SHashObj* pTableMap;
H
Haojun Liao 已提交
123 124 125
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
126 127 128 129
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
130 131 132
} SFileBlockDumpInfo;

typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
133 134
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
135
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
136
  SFileBlockDumpInfo   fBlockDumpInfo;
137 138 139 140 141
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
142 143
} SReaderStatus;

H
Hongze Cheng 已提交
144
struct STsdbReader {
H
Haojun Liao 已提交
145 146 147 148 149 150 151
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
152 153
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
154
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
155
  STsdbReadSnap*     pReadSnap;
156
  SIOCostSummary     cost;
157 158
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
159 160
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
161

162 163
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
164
};
H
Hongze Cheng 已提交
165

H
Haojun Liao 已提交
166
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
167 168
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
169
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
170 171
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Haojun Liao 已提交
172
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger);
173
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
174
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
175
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
176
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
177
                                     int32_t rowIndex);
178
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
179
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
180

181
static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
182
                             STsdbReader* pReader, bool* freeTSRow);
183 184
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
185 186
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader);

dengyihao's avatar
dengyihao 已提交
187 188 189 190
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
191
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
192
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
193 194
static bool    hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
195

196 197 198
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

199
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
200

201
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
202
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
203 204 205
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
206 207
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
208

H
Haojun Liao 已提交
209 210
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
211
    pSupInfo->colIds[i] = pCol->info.colId;
212 213 214 215

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
216
  }
H
Hongze Cheng 已提交
217

H
Haojun Liao 已提交
218 219
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
220

221
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
222
  // allocate buffer in order to load data blocks from file
223
  // todo use simple hash instead, optimize the memory consumption
224 225 226
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
227 228 229
    return NULL;
  }

230
  for (int32_t j = 0; j < numOfTables; ++j) {
231
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
232 233 234
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
235 236
      }

237
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
238
    } else {
239
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
240
    }
wmmhello's avatar
wmmhello 已提交
241

242 243 244
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
245 246
  }

247 248
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
249

250
  return pTableMap;
H
Hongze Cheng 已提交
251
}
H
Hongze Cheng 已提交
252

253 254 255
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
256
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
257 258
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
259
    if (p->iter.iter != NULL) {
260
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
261 262
    }

263
    p->delSkyline = taosArrayDestroy(p->delSkyline);
264 265 266
  }
}

267 268 269 270 271 272 273 274
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
275
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
276 277 278
    }

    if (p->iiter.iter != NULL) {
279
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
280 281
    }

282 283
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
284
    tMapDataClear(&p->mapData);
285 286 287 288 289
  }

  taosHashCleanup(pTableMap);
}

290
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
291 292
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
293
}
H
Hongze Cheng 已提交
294

295 296 297
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
298
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
299

300
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
301
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
302

dengyihao's avatar
dengyihao 已提交
303
  STimeWindow win = *pWindow;
304 305 306 307 308 309
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
310

H
Haojun Liao 已提交
311
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
312 313 314 315 316 317
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
318 319 320
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
321 322 323 324
  }
}

// init file iterator
325
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) {
H
Hongze Cheng 已提交
326
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
327

328 329
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
330
  pIter->pFileList = aDFileSet;
331
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
332

333 334 335 336
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
337
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
338 339 340
      return code;
    }

341 342 343 344 345
    SLastBlockReader* pLReader = pIter->pLastBlockReader;
    pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
    pLReader->order   = pReader->order;
    pLReader->window  = pReader->window;
    pLReader->verRange = pReader->verRange;
346
    pLReader->currentBlockIndex = -1;
H
Haojun Liao 已提交
347 348 349 350 351

    int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
352 353
  }

354
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
355 356 357
  return TSDB_CODE_SUCCESS;
}

358
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
359 360
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
361 362 363
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
364 365 366 367 368
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
369

370
  while (1) {
H
Haojun Liao 已提交
371 372 373
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
374

375
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
376

377 378 379 380
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
381

382 383
    pReader->cost.headFileLoad += 1;

384 385 386 387 388 389 390 391 392 393 394 395
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
396 397 398
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
399 400
      continue;
    }
C
Cary Xu 已提交
401

402
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
403
              pReader->window.ekey, pReader->idStr);
404 405
    return true;
  }
406

407
_err:
H
Haojun Liao 已提交
408 409 410
  return false;
}

411
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
412 413
  pIter->order = order;
  pIter->index = -1;
414
  pIter->numOfBlocks = 0;
415 416 417 418 419
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
420
  pIter->pTableMap = pTableMap;
421 422
}

L
Liu Jicong 已提交
423
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
424

H
Haojun Liao 已提交
425
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
426 427
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
428 429
}

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

453 454
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
455
  int32_t      code = 0;
456
  int8_t       level = 0;
H
Haojun Liao 已提交
457
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
458 459
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
460
    goto _end;
H
Hongze Cheng 已提交
461 462
  }

C
Cary Xu 已提交
463 464 465 466
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
467
  initReaderStatus(&pReader->status);
468

L
Liu Jicong 已提交
469
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
470 471
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
472
  pReader->capacity = 4096;
dengyihao's avatar
dengyihao 已提交
473 474
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
475
  pReader->type = pCond->type;
476
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
477

478
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
479

480
  limitOutputBufferSize(pCond, &pReader->capacity);
481

482 483
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
484
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
485
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
486
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
487 488 489
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
490

491 492
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
493
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
494 495 496 497 498
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

499 500 501 502
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
503
  }
H
Hongze Cheng 已提交
504

505 506
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
507 508
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
509

H
Haojun Liao 已提交
510 511
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
512 513 514
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
548
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575 576 577
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581 582 583 584 585 586 587 588 589 590 591
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
592

H
Hongze Cheng 已提交
593 594
//   return midSlot;
// }
H
Hongze Cheng 已提交
595

H
Haojun Liao 已提交
596
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
597
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
598

599
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
600
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
601
  if (code != TSDB_CODE_SUCCESS) {
602
    goto _end;
H
Haojun Liao 已提交
603
  }
H
Hongze Cheng 已提交
604

605 606
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
607
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
608 609
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
610

611 612 613 614
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
615
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
616

617
    // uid check
H
Hongze Cheng 已提交
618
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
619 620 621 622
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
623
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
624 625 626 627 628 629
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
630
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
631 632
    }

H
Hongze Cheng 已提交
633
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
634
  }
H
Hongze Cheng 已提交
635

636
  int64_t et2 = taosGetTimestampUs();
637
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
638
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
639 640 641

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

642
_end:
H
Hongze Cheng 已提交
643
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
644 645
  return code;
}
H
Hongze Cheng 已提交
646

647
static void cleanupTableScanInfo(SHashObj* pTableMap) {
648
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
649
  while (1) {
650
    px = taosHashIterate(pTableMap, px);
651 652 653 654
    if (px == NULL) {
      break;
    }

655 656
    // reset the index in last block when handing a new file
    px->indexInBlockL = -1;
657
    tMapDataClear(&px->mapData);
658 659
    taosArrayClear(px->pBlockList);
  }
660 661 662 663 664 665 666 667 668 669
}

static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
                               SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
670

dengyihao's avatar
dengyihao 已提交
671
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
672
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
673

674
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
675

676
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
677
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
678

679
    sizeInDisk += pScanInfo->mapData.nData;
680
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
681
      SBlock block = {0};
682
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
683

684
      // 1. time range check
685
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
686 687
        continue;
      }
H
Hongze Cheng 已提交
688

689
      // 2. version range check
H
Hongze Cheng 已提交
690
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
691 692
        continue;
      }
693

694
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
695
      if (p == NULL) {
696
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
697 698
        return TSDB_CODE_OUT_OF_MEMORY;
      }
699

700
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
701
    }
H
Hongze Cheng 已提交
702

H
Haojun Liao 已提交
703
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
704 705 706 707 708 709 710 711 712 713 714 715
      numOfQTable += 1;
    }
  }

  size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
  for(int32_t i = 0; i < numOfLast; ++i) {
    SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
    if (pLastBlock->suid != pReader->suid) {
      continue;
    }

    {
716 717 718 719
      // 1. time range check
      if (pLastBlock->minKey > pReader->window.ekey || pLastBlock->maxKey < pReader->window.skey) {
        continue;
      }
720 721 722 723 724 725 726

      // 2. version range check
      if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
        continue;
      }

      pBlockNum->numOfLastBlocks += 1;
727
      taosArrayPush(pQualifiedLastBlock, pLastBlock);
H
Haojun Liao 已提交
728 729
    }
  }
H
Hongze Cheng 已提交
730

731 732
  int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;

733
  double el = (taosGetTimestampUs() - st) / 1000.0;
734
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
735
            numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
736
            / 1000.0, el, pReader->idStr);
737

738
  pReader->cost.numOfBlocks += total;
739
  pReader->cost.headFileLoadTime += el;
740

H
Haojun Liao 已提交
741 742
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
743

744
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
745
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
746
  pDumpInfo->allDumped = true;
747
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
748 749
}

750 751
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
752
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
753
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
754 755 756
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
757
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
758 759 760 761
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
762
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
763
  }
H
Haojun Liao 已提交
764 765
}

766
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
767 768
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
769 770
    return NULL;
  }
771 772 773

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
774 775
}

776
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
777

778
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
779
  SReaderStatus*  pStatus = &pReader->status;
780
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
781

782
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
783
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
784
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
785
  SSDataBlock*        pResBlock = pReader->pResBlock;
786
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
787

H
Haojun Liao 已提交
788
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
789
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
790

H
Haojun Liao 已提交
791
  SColVal cv = {0};
792
  int64_t st = taosGetTimestampUs();
793 794
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
795

796
  int32_t rowIndex = 0;
797 798
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

799 800 801 802 803 804 805 806
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

807
  int32_t          i = 0;
808 809
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
810
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
811 812 813 814 815
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

816 817 818
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
819 820 821
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
822
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
823 824 825
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
826
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
827 828
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
829
      }
830
      colIndex += 1;
831
      i += 1;
832
      ASSERT(rowIndex == remain);
833 834
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
835
      i += 1;
H
Haojun Liao 已提交
836
    }
837 838
  }

839
  while (i < numOfOutputCols) {
840 841 842
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
843
  }
H
Haojun Liao 已提交
844

845
  pResBlock->info.rows = remain;
846
  pDumpInfo->rowIndex += step * remain;
847

848
  setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
H
Haojun Liao 已提交
849

850
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
851
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
852

853
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
854
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
855
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
856
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
857
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
858 859 860 861

  return TSDB_CODE_SUCCESS;
}

862
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
863
  int64_t st = taosGetTimestampUs();
864
  double  elapsedTime = 0;
865
  int32_t code = 0;
866

867
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
868
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
869

870
  if (pBlockInfo != NULL) {
871 872 873
    SBlock* pBlock = getCurrentBlock(pBlockIter);
    code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
    if (code != TSDB_CODE_SUCCESS) {
874 875 876 877
      tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                ", rows:%d, code:%s %s",
                pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
                tstrerror(code), pReader->idStr);
878 879
      goto _error;
    }
880

881
    elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
882

883 884
    tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
885
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
886 887
              pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
  } else {
888 889 890 891 892
#if 0
    SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

    uint64_t uid = pBlockInfo->uid;
    SArray*  pBlocks = pLastBlockReader->pBlockL;
893

894 895 896 897 898 899 900 901 902 903 904 905
    pLastBlockReader->currentBlockIndex = -1;

    // find the correct SBlockL
    for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
      SBlockL* pBlock = taosArrayGet(pBlocks, i);
      if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
        pLastBlockReader->currentBlockIndex = i;
        break;
      }
    }

//    SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
906
    code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
907 908 909 910 911 912 913 914 915 916 917 918 919
    if (code != TSDB_CODE_SUCCESS) {
      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
                pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
      goto _error;
    }

    tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
              pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
              pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
920 921 922 923
  }

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
924

H
Haojun Liao 已提交
925
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
926 927

_error:
H
Haojun Liao 已提交
928
  return code;
H
Haojun Liao 已提交
929
}
H
Hongze Cheng 已提交
930

H
Haojun Liao 已提交
931 932 933
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
934

H
Haojun Liao 已提交
935 936 937 938
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
939

H
Haojun Liao 已提交
940 941
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
942

H
Haojun Liao 已提交
943 944
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
945

H
Haojun Liao 已提交
946
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
947 948
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
949

H
Haojun Liao 已提交
950 951 952 953
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
954

H
Haojun Liao 已提交
955 956
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
957

H
Haojun Liao 已提交
958
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
959
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
960
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
961

H
Haojun Liao 已提交
962
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
963

H
Haojun Liao 已提交
964 965
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
966

H
Haojun Liao 已提交
967 968 969 970 971 972 973
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
974

975
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
976
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
977

978 979 980 981
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
982
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
983 984 985 986 987
  if (pFBlock != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
  }
988 989 990 991 992 993

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
994
}
H
Hongze Cheng 已提交
995

996
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
997
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
998

999
  pBlockIter->numOfBlocks = numOfBlocks;
1000 1001
  taosArrayClear(pBlockIter->blockList);

1002 1003
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1004

1005
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
1006

1007
  SBlockOrderSupporter sup = {0};
1008
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
1009 1010 1011
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1012

1013 1014 1015 1016 1017 1018 1019
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1020

1021 1022 1023 1024
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1025

1026 1027
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1028

1029 1030 1031 1032 1033
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1034

1035
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1036
    SBlock block = {0};
1037 1038
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1039 1040 1041 1042

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

1043
      wrapper.uid = pTableScanInfo->uid;
1044
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1045

1046 1047 1048 1049 1050 1051
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1052

1053
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1054

1055
  // since there is only one table qualified, blocks are not sorted
1056 1057
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1058 1059
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1060
    }
1061

1062
    int64_t et = taosGetTimestampUs();
1063
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1064
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1065

1066
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1067
    cleanupBlockOrderSupporter(&sup);
1068
    doSetCurrentBlock(pBlockIter);
1069
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1070
  }
H
Haojun Liao 已提交
1071

1072 1073
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1074

1075
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1076

1077 1078 1079 1080 1081
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1082
  }
H
Haojun Liao 已提交
1083

1084 1085 1086 1087
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1088

1089 1090
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1091

1092 1093 1094 1095
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1096

1097 1098
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1099
  }
H
Haojun Liao 已提交
1100

1101
  int64_t et = taosGetTimestampUs();
1102
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0,
1103
            pReader->idStr);
1104 1105
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1106

1107
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
1108 1109
  doSetCurrentBlock(pBlockIter);

1110
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1111
}
H
Hongze Cheng 已提交
1112

H
Haojun Liao 已提交
1113
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1114 1115
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1116
  int32_t step = asc ? 1 : -1;
1117
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1118 1119 1120
    return false;
  }

1121
  pBlockIter->index += step;
1122 1123
  doSetCurrentBlock(pBlockIter);

1124 1125 1126
  return true;
}

1127 1128 1129
/**
 * This is an two rectangles overlap cases.
 */
1130
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1131 1132
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1133 1134
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1135
}
H
Hongze Cheng 已提交
1136

1137 1138
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1139 1140 1141
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1142 1143
  }

1144
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1145 1146 1147
    return NULL;
  }

1148
  int32_t step = asc ? 1 : -1;
1149
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1150

1151
  SBlock*  pBlock = taosMemoryCalloc(1, sizeof(SBlock));
1152 1153 1154 1155
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1156 1157 1158 1159 1160
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1161
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1162 1163
  int32_t index = pBlockIter->index;

1164
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1177
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1178
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1179 1180 1181 1182
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1183 1184 1185 1186 1187
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1188

1189 1190 1191
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1192

1193
  doSetCurrentBlock(pBlockIter);
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1204
}
H
Hongze Cheng 已提交
1205

1206
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1207
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1208

1209
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1210
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1211
}
H
Hongze Cheng 已提交
1212

H
Haojun Liao 已提交
1213
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1214 1215
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1216 1217
}

1218 1219 1220 1221 1222 1223
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1224
      if (p->version >= pBlock->minVer) {
1225 1226 1227
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1228
      if (p->version >= pBlock->minVer) {
1229 1230 1231 1232 1233 1234 1235
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1236
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1252
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1253 1254 1255 1256
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1257
  // ts is not overlap
1258
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1259
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1260 1261 1262 1263 1264
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1265 1266 1267 1268
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1269
    while (1) {
1270 1271 1272 1273 1274
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1275 1276 1277
      }
    }

1278 1279
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1280 1281
}

1282 1283 1284 1285
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1286
// 5. delete info should not overlap with current block data
1287
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
1288
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
1289 1290 1291
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1292
  // overlap with neighbor
1293 1294 1295
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1296
    taosMemoryFree(pNeighbor);
1297 1298
  }

1299
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1300 1301
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1302

1303 1304
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
  bool overlapWithlastBlock = false;
1305
  if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
1306 1307 1308 1309
    SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
    overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
  }

1310 1311 1312 1313 1314 1315 1316 1317 1318 1319
  bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
  bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);

  bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
                        moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1320
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1321 1322 1323 1324 1325 1326
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
              pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
              moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1327 1328
}

1329
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1330
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1331 1332
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1333

1334 1335 1336
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1337
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1338

1339
  blockDataUpdateTsWindow(pBlock, 0);
1340
  pBlock->info.uid = pBlockScanInfo->uid;
1341

1342
  setComposedBlockFlag(pReader, true);
1343

1344
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1345
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1346 1347 1348
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1349 1350

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1351 1352 1353
  return code;
}

1354 1355
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1356 1357 1358 1359 1360
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1361
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1362 1363

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1364
    if (nextKey != key) {  // merge is not needed
1365
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1366 1367 1368 1369 1370 1371 1372 1373
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
1374 1375 1376 1377 1378 1379
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1380
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1399 1400 1401 1402 1403 1404 1405
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1406
  int64_t tsLast = INT64_MIN;
1407
  if ((pLastBlockReader->lastBlockData.nRow > 0) && hasDataInLastBlock(pLastBlockReader)) {
1408 1409
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1410 1411 1412 1413 1414 1415

  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;

1416 1417 1418 1419 1420 1421
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
    minKey = INT64_MAX;   // chosen the minimum value
    if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }
1422

1423 1424 1425
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1426

1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
    if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1443 1444 1445 1446
  }

  bool init = false;

1447 1448
  // ASC: file block ---> last block -----> imem -----> mem
  //DESC: mem -----> imem -----> last block -----> file block
1449 1450
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1451
      init = true;
1452 1453
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1454 1455
    }

1456
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1457 1458 1459 1460
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1461 1462 1463
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1464
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1465
    }
1466

1467
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1468 1469 1470
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1471 1472 1473 1474 1475 1476 1477 1478 1479
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1480 1481
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1482
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1483 1484
    }

1485
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1486 1487 1488 1489
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1490 1491 1492
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1493
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1494 1495 1496
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1497 1498 1499
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1500 1501 1502 1503 1504
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1515 1516
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1517 1518
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
  if (pBlockData->nRow > 0) {
    // no last block available, only data block exists
    if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
      if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1536

1537 1538
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1539
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
1540

1541 1542
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1543

1544 1545
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1546 1547
        return TSDB_CODE_SUCCESS;
      } else {
1548 1549
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1550
      }
1551 1552 1553
    } else {  // desc order
      SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
1554

1555 1556 1557 1558
      STSRow*    pTSRow = NULL;
      SRowMerger merge = {0};
      tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
1559

1560 1561 1562
      if (ts == key) {
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }
1563 1564

      tRowMergerGetRow(&merge, &pTSRow);
1565 1566 1567 1568 1569
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
      return TSDB_CODE_SUCCESS;
1570
    }
1571 1572 1573
  } else {  // only last block exists
    SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
    int64_t     tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1574

1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

    TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);

    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
    tRowMergerGetRow(&merge, &pTSRow);

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1585

H
Haojun Liao 已提交
1586
    taosMemoryFree(pTSRow);
1587 1588
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1589
  }
1590 1591
}

1592
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
  ASSERT(pRow != NULL && piRow != NULL);

  SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
1604 1605 1606 1607
  int64_t tsLast = INT64_MIN;
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1608 1609 1610 1611 1612 1613

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1614 1615 1616 1617 1618 1619
  int64_t minKey = 0;//INT64_MAX;
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1620

1621 1622 1623
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1624

1625 1626 1627
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
1628

1629
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645
      minKey = tsLast;
    }
  } else {
    minKey = INT64_MIN; // let find the maximum ts value
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }

1646
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1647 1648
      minKey = tsLast;
    }
1649 1650 1651 1652
  }

  bool init = false;

1653 1654 1655 1656
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1657
      init = true;
1658 1659 1660
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1661 1662
    }

1663
    if (minKey == tsLast) {
H
Haojun Liao 已提交
1664 1665 1666 1667
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1668 1669 1670
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1671
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1672 1673 1674
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1675 1676 1677
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1678 1679 1680 1681 1682
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1683 1684
    }

1685
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1686 1687 1688
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1689 1690 1691 1692 1693 1694 1695 1696
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1697 1698
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1699 1700 1701 1702
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1703 1704 1705
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1706 1707 1708 1709 1710 1711 1712 1713
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == tsLast) {
H
Haojun Liao 已提交
1714 1715 1716 1717
      TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1718 1719 1720
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1721
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1722 1723 1724
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1725
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1726 1727
      if (!init) {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1728 1729
      } else {
        tRowMerge(&merge, &fRow);
1730 1731
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
    }
  }

  tRowMergerGetRow(&merge, &pTSRow);
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1743
#if 0
1744
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1745 1746 1747
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1748
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1749
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1750

1751 1752
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1753
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1754

1755
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1756
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1757

1758
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1759

1760 1761 1762
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1763 1764
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1765 1766 1767
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1768
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1769

1770 1771
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1772
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1773 1774
      }

1775 1776
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1777
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1778 1779 1780
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1781
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1782
      return TSDB_CODE_SUCCESS;
1783
    } else {  // key > ik.ts || key > k.ts
1784 1785
      ASSERT(key != ik.ts);

1786
      // [3] ik.ts < key <= k.ts
1787
      // [4] ik.ts < k.ts <= key
1788
      if (ik.ts < k.ts) {
1789
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1790
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1791 1792 1793
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1794 1795 1796
        return TSDB_CODE_SUCCESS;
      }

1797 1798
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1799
      if (k.ts < ik.ts) {
1800
        doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1801
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1802 1803 1804
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1805 1806 1807
        return TSDB_CODE_SUCCESS;
      }

1808
      // [7] k.ts == ik.ts < key
1809
      if (k.ts == ik.ts) {
1810 1811
        ASSERT(key > ik.ts && key > k.ts);

1812
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1813
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1814
        taosMemoryFree(pTSRow);
1815 1816 1817
        return TSDB_CODE_SUCCESS;
      }
    }
1818 1819 1820
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1821
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1822

H
Haojun Liao 已提交
1823
      tRowMergerInit(&merge, pRow, pSchema);
1824
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1825 1826 1827

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1828
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1829 1830 1831 1832 1833 1834 1835 1836 1837
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1838
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1839 1840
      return TSDB_CODE_SUCCESS;
    } else {
1841
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1842 1843 1844 1845

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1846
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1847
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1848 1849 1850
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1862
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1863
        taosMemoryFree(pTSRow);
1864 1865 1866 1867 1868
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1869
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1870 1871 1872 1873 1874

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1875
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1876 1877

        taosMemoryFree(pTSRow);
1878 1879 1880 1881 1882 1883
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1884
  return -1;
1885
}
1886
#endif
1887

dengyihao's avatar
dengyihao 已提交
1888 1889
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1890 1891 1892 1893 1894 1895 1896 1897
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1909
  TSDBKEY k = {.ts = ts, .version = ver};
1910
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1911 1912 1913
    return false;
  }

1914 1915 1916
  return true;
}

1917
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1918

1919
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) {
1920 1921
  pLastBlockReader->uid = uid;
  pLastBlockReader->rowIndex = startPos;
1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933

  if (*startPos == -1) {
    if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
      // do nothing
    } else {
      *startPos = pLastBlockReader->lastBlockData.nRow;
    }
  }
}

static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
  *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
1934 1935
}

H
Haojun Liao 已提交
1936
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
1937 1938
  int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1939 1940 1941
    return false;
  }

1942
  *(pLastBlockReader->rowIndex) += step;
1943 1944

  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1945
  for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
H
Haojun Liao 已提交
1946
    if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) {
1947 1948 1949
      continue;
    }

H
Haojun Liao 已提交
1950 1951
    int64_t ts = pBlockData->aTSKEY[i];
    if (ts < pLastBlockReader->window.skey) {
1952 1953 1954
      continue;
    }

H
Haojun Liao 已提交
1955 1956
    int64_t ver = pBlockData->aVersion[i];
    if (ver < pLastBlockReader->verRange.minVer) {
1957 1958 1959
      continue;
    }

H
Haojun Liao 已提交
1960
    // no data any more, todo opt handle desc case
H
Haojun Liao 已提交
1961
    if (ts > pLastBlockReader->window.ekey) {
H
Haojun Liao 已提交
1962
      continue;
1963 1964
    }

H
Haojun Liao 已提交
1965
    // todo opt handle desc case
H
Haojun Liao 已提交
1966 1967 1968 1969 1970
    if (ver > pLastBlockReader->verRange.maxVer) {
      continue;
    }

    TSDBKEY k = {.ts = ts, .version = ver};
1971
    if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
H
Haojun Liao 已提交
1972
      continue;
1973 1974
    }

1975
    *(pLastBlockReader->rowIndex) = i;
1976 1977 1978
    return true;
  }

1979
  // set all data is consumed in last block
1980
  setAllRowsChecked(pLastBlockReader);
1981 1982 1983 1984 1985
  return false;
}

static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
1986
  return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
1987 1988 1989
}

static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
1990
  if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
1991 1992
    return false;
  }
1993 1994
  
  ASSERT(pLastBlockReader->lastBlockData.nRow > 0);
1995 1996 1997
  return true;
}

1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
  
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2022 2023
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2024 2025
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2026
  int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
2027 2028
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
2029

2030
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2031
    return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2032
  } else {
2033
    // imem + file + last block
2034
    if (pBlockScanInfo->iiter.hasVal) {
2035
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2036 2037
    }

2038
    // mem + file + last block
2039
    if (pBlockScanInfo->iter.hasVal) {
2040
      return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2041
    }
2042

2043 2044
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2045 2046 2047
  }
}

2048
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
2049 2050
  SSDataBlock* pResBlock = pReader->pResBlock;

2051
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2052 2053 2054 2055 2056 2057 2058 2059

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2060
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2061
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2062 2063
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2064

2065 2066
  int64_t st = taosGetTimestampUs();

2067
  while (1) {
2068
    // todo check the validate of row in file block
2069
    bool hasBlockData = false;
2070
    {
H
Haojun Liao 已提交
2071
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2072 2073 2074 2075 2076
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2077 2078
        pDumpInfo->rowIndex += step;

2079
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2080
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2081
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2082 2083 2084
          break;
        }
      }
2085 2086 2087
    }
      
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2088

2089 2090 2091
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2092 2093
    }

2094
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2095

2096
    // currently loaded file data block is consumed
2097
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
2098
      SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2099
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2100 2101 2102 2103 2104
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2105 2106 2107 2108
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2109 2110
  blockDataUpdateTsWindow(pResBlock, 0);

2111
  setComposedBlockFlag(pReader, true);
2112
  int64_t et = taosGetTimestampUs();
2113

H
Haojun Liao 已提交
2114
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s",
2115
            pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
2116
            pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
2117

2118 2119 2120 2121 2122
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2123
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2124 2125 2126 2127
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

2128
  int32_t code = TSDB_CODE_SUCCESS;
2129 2130 2131 2132 2133 2134 2135 2136 2137

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
2138 2139

  STbData* d = NULL;
H
Hongze Cheng 已提交
2140
  if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
2141
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
2142
    if (d != NULL) {
2143
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
2144
      if (code == TSDB_CODE_SUCCESS) {
2145
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
2146

H
Haojun Liao 已提交
2147
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2148 2149
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
2150
      } else {
2151 2152
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2153
        return code;
2154 2155
      }
    }
H
Haojun Liao 已提交
2156
  } else {
2157
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2158 2159
  }

2160
  STbData* di = NULL;
H
Hongze Cheng 已提交
2161
  if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
2162
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
2163
    if (di != NULL) {
2164
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
2165
      if (code == TSDB_CODE_SUCCESS) {
2166
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
2167

H
Haojun Liao 已提交
2168
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2169
                  "-%" PRId64 " %s",
2170
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
2171
      } else {
2172 2173
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2174
        return code;
2175 2176
      }
    }
H
Haojun Liao 已提交
2177 2178
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2179 2180
  }

2181 2182
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

2183
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
2184 2185 2186
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2187 2188
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2189 2190 2191
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2192

2193 2194 2195
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2196 2197
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2198
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2199 2200
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2201
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2202
    if (code != TSDB_CODE_SUCCESS) {
2203 2204 2205 2206 2207
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2208
      tsdbDelFReaderClose(&pDelFReader);
2209 2210 2211
      goto _err;
    }

H
Hongze Cheng 已提交
2212
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2213 2214 2215
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2216 2217
      goto _err;
    }
2218

2219 2220 2221
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2222
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2223
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2224 2225 2226 2227 2228 2229 2230
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2231
    }
2232
  }
2233

2234 2235 2236 2237 2238 2239 2240
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2241 2242
  }

2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2257 2258
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2259 2260
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2261
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2262 2263
  return code;

2264 2265 2266
_err:
  taosArrayDestroy(pDelData);
  return code;
2267 2268
}

2269
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2270 2271
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

2272 2273
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2274
  if (pRow != NULL) {
2275 2276 2277
    key = TSDBROW_KEY(pRow);
  }

2278
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2279
  if (pRow != NULL) {
2280 2281 2282 2283 2284 2285 2286 2287 2288
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2289
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2290
  SReaderStatus* pStatus = &pReader->status;
2291 2292
  pBlockNum->numOfBlocks = 0;
  pBlockNum->numOfLastBlocks = 0;
2293

2294
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2295
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
2296
  SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
2297
  taosArrayClear(pLastBlocks);
H
Haojun Liao 已提交
2298 2299

  while (1) {
2300
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2301
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2302 2303 2304
      break;
    }

H
Haojun Liao 已提交
2305
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2306 2307
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2308
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2309 2310 2311
      return code;
    }

2312
    code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
2313 2314 2315 2316 2317
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(pIndexList);
      return code;
    }

2318 2319 2320 2321
    if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
      SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));

      code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
H
Haojun Liao 已提交
2322
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2323
        taosArrayDestroy(pIndexList);
2324
        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2325 2326 2327
        return code;
      }

2328 2329
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
        ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
2330 2331 2332 2333
        taosArrayClear(pLastBlocks);
        taosArrayAddAll(pLastBlocks, pQLastBlock);

        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2334 2335
        break;
      }
H
Haojun Liao 已提交
2336 2337

      taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2338
    }
2339

H
Haojun Liao 已提交
2340 2341 2342
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2343
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2344 2345 2346
  return TSDB_CODE_SUCCESS;
}

2347
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) {
2348 2349 2350
  SArray*  pBlocks = pLastBlockReader->pBlockL;
  SBlockL* pBlock = NULL;

2351
  uint64_t uid = pBlockScanInfo->uid;
2352 2353
  int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks);

2354
  initMemDataIterator(pBlockScanInfo, pReader);
2355

2356 2357 2358
  // find the correct SBlockL. todo binary search
  int32_t index = -1;
  for (int32_t i = 0; i < totalLastBlocks; ++i) {
2359 2360
    SBlockL* p = taosArrayGet(pBlocks, i);
    if (p->minUid <= uid && p->maxUid >= uid) {
2361
      index = i;
2362 2363 2364 2365 2366
      pBlock = p;
      break;
    }
  }

2367 2368
  if (index == -1) {
    pLastBlockReader->currentBlockIndex = index;
H
Haojun Liao 已提交
2369
    tBlockDataReset(&pLastBlockReader->lastBlockData);
2370 2371 2372
    return TSDB_CODE_SUCCESS;
  }

2373 2374 2375 2376 2377
  // the required last datablock has already loaded
  if (index == pLastBlockReader->currentBlockIndex) {
    return TSDB_CODE_SUCCESS;
  }

2378
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
2379
  int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
2380
  if (code != TSDB_CODE_SUCCESS) {
2381
    tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
2382 2383 2384 2385
    return code;
  }

  code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
2386 2387

  double el = (taosGetTimestampUs() - st) / 1000.0;
2388
  if (code != TSDB_CODE_SUCCESS) {
2389 2390 2391 2392
    tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
              pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
  } else {
    tsdbDebug("%p load last block completed, uid:%" PRIu64
2393
              " last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 "-%" PRId64
2394
              " elapsed time:%.2f ms, %s",
2395 2396
              pReader, uid, index, totalLastBlocks, pBlock->nRow, pBlock->minVer, pBlock->maxVer, pBlock->minKey,
              pBlock->maxKey, el, pReader->idStr);
2397 2398
  }

2399
  pLastBlockReader->currentBlockIndex = index;
2400 2401 2402
  pReader->cost.lastBlockLoad += 1;
  pReader->cost.lastBlockLoadTime += el;

2403 2404 2405 2406
  return TSDB_CODE_SUCCESS;
}

static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2407
  SReaderStatus* pStatus = &pReader->status;
2408 2409 2410 2411 2412 2413 2414 2415
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

  while(1) {
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
2416 2417 2418 2419 2420
    }

    // load the last data block of current table
    // todo opt perf by avoiding load last block repeatly
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
2421
    int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
2422
    if (code != TSDB_CODE_SUCCESS) {
2423 2424 2425
      return code;
    }

2426 2427
    if (pLastBlockReader->currentBlockIndex != -1) {
      initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
2428 2429
      int32_t index = pScanInfo->indexInBlockL;
      if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
H
Haojun Liao 已提交
2430
        bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
2431 2432 2433 2434 2435 2436
        if (!hasData) {  // current table does not have rows in last block, try next table
          pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
          if (pStatus->pTableIter == NULL) {
            return TSDB_CODE_SUCCESS;
          }
          continue;
2437
        }
2438
      }
2439 2440 2441 2442 2443 2444
    } else {  // no data in last block, try next table
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
      continue;
2445 2446
    }

2447 2448 2449 2450 2451 2452 2453 2454
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2455

2456 2457 2458 2459 2460
    // current table is exhausted, let's try next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
      return TSDB_CODE_SUCCESS;
    }
2461 2462 2463
  }
}

2464
static int32_t doBuildDataBlock(STsdbReader* pReader) {
2465
  TSDBKEY key = {0};
2466
  int32_t code = TSDB_CODE_SUCCESS;
2467
  SBlock* pBlock = NULL;
2468 2469 2470

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2471 2472 2473
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2474

2475
  if (pBlockInfo != NULL) {
2476 2477 2478 2479 2480 2481
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

  if (pBlockInfo != NULL) {
2482
    pBlock = getCurrentBlock(pBlockIter);
2483 2484 2485 2486
  }

  {
    key = getCurrentKeyInBuf(pScanInfo, pReader);
2487

2488
    // load the last data block of current table
2489
    code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
2490
    if (code != TSDB_CODE_SUCCESS) {
2491
      return code;
2492 2493
    }

H
Haojun Liao 已提交
2494
    // note: the lastblock may be null here
2495 2496
    initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
    if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
H
Haojun Liao 已提交
2497
      bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
2498
    }
2499
  }
2500

2501 2502 2503 2504 2505 2506 2507
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2508
      return code;
2509
    }
2510

2511 2512 2513
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2514 2515 2516
    }

    // build composed data block
2517
    code = buildComposedDataBlock(pReader);
2518 2519
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2520
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2521
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2522
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
      ASSERT (tsLast >= pBlock->maxKey.ts);
      tBlockDataReset(&pReader->status.fileBlockData);

      code = buildComposedDataBlock(pReader);
    } else {   // whole block is required, return it directly
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2539 2540 2541 2542 2543
  }

  return code;
}

H
Haojun Liao 已提交
2544
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2545 2546
  SReaderStatus* pStatus = &pReader->status;

2547
  while (1) {
2548 2549 2550
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2551
        return TSDB_CODE_SUCCESS;
2552 2553 2554 2555
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2556
    initMemDataIterator(pBlockScanInfo, pReader);
2557

2558
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2559
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2560 2561 2562 2563
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2564
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2565
      return TSDB_CODE_SUCCESS;
2566 2567 2568 2569 2570
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2571
      return TSDB_CODE_SUCCESS;
2572 2573 2574 2575
    }
  }
}

2576
// set the correct start position in case of the first/last file block, according to the query time window
2577
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2578
  SBlock* pBlock = getCurrentBlock(pBlockIter);
2579

2580 2581 2582
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2583 2584 2585

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2586
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2587 2588
}

2589
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2590 2591
  SBlockNumber num = {0};

2592
  int32_t code = moveToNextFile(pReader, &num);
2593 2594 2595 2596 2597
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2598
  if (num.numOfBlocks + num.numOfLastBlocks == 0) {
2599 2600 2601 2602 2603
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2604 2605
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
2606
  } else { // no block data, only last block exists
2607 2608
    tBlockDataReset(&pReader->status.fileBlockData);
    resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
2609
  }
2610

2611 2612 2613
  SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
  pLReader->currentBlockIndex = -1;

2614
  // set the correct start position according to the query time window
2615
  initBlockDumpInfo(pReader, pBlockIter);
2616 2617 2618
  return code;
}

2619
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2620 2621
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2622 2623
}

2624
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2625
  int32_t code = TSDB_CODE_SUCCESS;
2626 2627
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2628 2629
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2630 2631 2632 2633 2634 2635 2636
  if (pBlockIter->numOfBlocks == 0) {
    _begin:
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2637 2638 2639 2640
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2641
    // all data blocks are checked in this last block file, now let's try the next file
2642 2643 2644 2645 2646 2647 2648 2649
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2650
      // this file does not have data files, let's start check the last block file if exists
2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2666
  while (1) {
2667 2668
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2669
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2670
      code = buildComposedDataBlock(pReader);
2671 2672 2673 2674 2675 2676 2677
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2678 2679
        } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {  // data blocks in current file are exhausted, let's try the next file now
          tBlockDataReset(&pReader->status.fileBlockData);
2680
          resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
2681 2682
          goto _begin;
        } else {
2683 2684 2685 2686 2687 2688
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2689 2690 2691 2692 2693

          // this file does not have blocks, let's start check the last block file
          if (pBlockIter->numOfBlocks == 0) {
            goto _begin;
          }
2694
        }
H
Haojun Liao 已提交
2695
      }
2696 2697

      code = doBuildDataBlock(pReader);
2698 2699
    }

2700 2701 2702 2703 2704 2705 2706 2707
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2708
}
H
refact  
Hongze Cheng 已提交
2709

2710 2711
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2712
  if (VND_IS_RSMA(pVnode)) {
2713
    int8_t  level = 0;
2714 2715
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2716
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2730
    const char* str = (idStr != NULL) ? idStr : "";
2731 2732

    if (level == TSDB_RETENTION_L0) {
2733
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2734
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2735 2736
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2737
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2738
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2739 2740
      return VND_RSMA1(pVnode);
    } else {
2741
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2742
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2743 2744 2745 2746 2747 2748 2749
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2750
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2751
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2752 2753

  int64_t endVer = 0;
L
Liu Jicong 已提交
2754 2755
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2756 2757
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2758
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2759 2760
  }

H
Haojun Liao 已提交
2761
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2762 2763
}

2764
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2765 2766 2767 2768
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2769 2770 2771
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2772

2773 2774 2775 2776 2777 2778
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2779
        return false;
2780 2781 2782
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2783 2784
      }
    } else {
2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2815 2816
    }
  } else {
2817 2818
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2819

2820 2821 2822 2823 2824 2825 2826
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2827
    } else {
2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2855 2856 2857 2858 2859
      }

      return false;
    }
  }
2860 2861

  return false;
2862 2863 2864 2865
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2866 2867
    return NULL;
  }
H
Hongze Cheng 已提交
2868

2869
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2870
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2871
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2872
    pIter->hasVal = false;
H
Haojun Liao 已提交
2873 2874
    return NULL;
  }
H
Hongze Cheng 已提交
2875

2876
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2877
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2878
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2879 2880
    return pRow;
  }
H
Hongze Cheng 已提交
2881

2882
  while (1) {
2883 2884
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2885 2886
      return NULL;
    }
H
Hongze Cheng 已提交
2887

2888
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2889

H
Haojun Liao 已提交
2890
    key = TSDBROW_KEY(pRow);
2891
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2892
      pIter->hasVal = false;
H
Haojun Liao 已提交
2893 2894
      return NULL;
    }
H
Hongze Cheng 已提交
2895

dengyihao's avatar
dengyihao 已提交
2896
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2897
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2898 2899 2900 2901
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2902

2903 2904
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2905
  while (1) {
2906 2907
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2908 2909
      break;
    }
H
Hongze Cheng 已提交
2910

2911
    // data exists but not valid
2912
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2913 2914 2915 2916 2917
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2918
    TSDBKEY k = TSDBROW_KEY(pRow);
2919
    if (k.ts != ts) {
H
Haojun Liao 已提交
2920 2921 2922
      break;
    }

H
Haojun Liao 已提交
2923
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2924
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2925 2926 2927 2928 2929
  }

  return TSDB_CODE_SUCCESS;
}

2930
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2931
                                          SVersionRange* pVerRange, int32_t step) {
2932 2933
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2934
      rowIndex += step;
2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2952 2953
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2954
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2955
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2956

2957
  *state = CHECK_FILEBLOCK_QUIT;
2958
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2959 2960 2961

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2962
  if (pNeighborBlock == NULL) {  // do nothing
2963 2964 2965 2966
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2967 2968
  taosMemoryFree(pNeighborBlock);

2969
  if (overlap) {  // load next block
2970
    SReaderStatus*  pStatus = &pReader->status;
2971 2972
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2973
    // 1. find the next neighbor block in the scan block list
2974
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2975
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2976

2977
    // 2. remove it from the scan block list
2978
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2979

2980
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2981
    tBlockDataReset(&pStatus->fileBlockData);
2982 2983 2984 2985 2986 2987
    int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2988 2989 2990 2991
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2992
    // 4. check the data values
2993 2994 2995 2996
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2997
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2998 2999 3000 3001 3002 3003 3004
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3005 3006
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3007 3008
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3009
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3010
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3011
  int32_t step = asc ? 1 : -1;
3012

3013
  pDumpInfo->rowIndex += step;
3014
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3015 3016 3017
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3018

3019 3020 3021 3022
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3023

3024
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
3025
      SBlock*             pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3026 3027 3028
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3029
      }
3030
    }
H
Haojun Liao 已提交
3031
  }
3032

H
Haojun Liao 已提交
3033 3034 3035
  return TSDB_CODE_SUCCESS;
}

3036
// todo check if the rows are dropped or not
H
Haojun Liao 已提交
3037 3038
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) {
  while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) {
3039 3040
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3041
      TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);
3042 3043 3044 3045 3046 3047 3048 3049 3050
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3051
void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
3052
                      STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3053
  TSDBROW* pNextRow = NULL;
3054
  TSDBROW  current = *pRow;
3055

3056 3057
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3058

3059 3060 3061 3062 3063
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
      return;
    } else {  // has next point in mem/imem
H
Haojun Liao 已提交
3064
      pNextRow = getValidRow(pIter, pDelList, pReader);
3065 3066 3067 3068 3069 3070
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }

H
Haojun Liao 已提交
3071
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3072 3073 3074 3075
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }
3076
    }
3077 3078
  }

3079 3080
  SRowMerger merge = {0};

3081
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3082
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3083

3084 3085
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3086
  }
H
Haojun Liao 已提交
3087

H
Haojun Liao 已提交
3088 3089 3090 3091 3092 3093
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
3094
  tRowMergerGetRow(&merge, pTSRow);
3095
  tRowMergerClear(&merge);
M
Minglei Jin 已提交
3096

3097
  *freeTSRow = true;
3098 3099
}

3100 3101
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
3102 3103
  SRowMerger merge = {0};

3104 3105 3106
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3107
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3108
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3109

H
Haojun Liao 已提交
3110
    tRowMergerInit(&merge, piRow, pSchema);
3111
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3112

3113
    tRowMerge(&merge, pRow);
3114
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3115
  } else {
H
Haojun Liao 已提交
3116
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3117

H
Haojun Liao 已提交
3118
    tRowMergerInit(&merge, pRow, pSchema);
3119
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3120 3121

    tRowMerge(&merge, piRow);
3122
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3123
  }
3124 3125 3126 3127

  tRowMergerGetRow(&merge, pTSRow);
}

3128 3129
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3130 3131
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3132
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3133
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3134

3135 3136
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3137
  if (pBlockScanInfo->iter.hasVal) {
3138 3139 3140 3141 3142 3143
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3144
  if (pBlockScanInfo->iiter.hasVal) {
3145 3146 3147 3148 3149 3150
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3151
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3152
    TSDBKEY k = TSDBROW_KEY(pRow);
3153
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3154

3155 3156 3157 3158 3159 3160
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
        doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
      }
3161 3162
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
3163
      *freeTSRow = true;
H
Haojun Liao 已提交
3164
    }
3165 3166

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3167 3168
  }

3169
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
3170
    doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3171 3172 3173
    return TSDB_CODE_SUCCESS;
  }

3174
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3175
    doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3176 3177 3178 3179 3180 3181
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3182
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3183 3184 3185
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3186
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3187
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3188

3189
  SColVal colVal = {0};
3190
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3191

3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3203
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3204 3205 3206 3207 3208 3209 3210 3211
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3212
    }
3213 3214
  }

3215
  // set null value since current column does not exist in the "pSchema"
3216
  while (i < numOfCols) {
3217 3218 3219 3220 3221
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3222 3223 3224 3225
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3226
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
3227 3228 3229 3230 3231 3232 3233 3234
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3235
    i += 1;
3236 3237 3238 3239 3240 3241
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

3242
  while (i < numOfOutputCols && j < numOfInputCols) {
3243
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3244
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3245 3246

    if (pData->cid == pCol->info.colId) {
3247 3248
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3249 3250 3251 3252 3253 3254 3255 3256 3257 3258
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3259
    colDataAppendNULL(pCol, outputRowIndex);
3260 3261 3262 3263 3264 3265 3266
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3267 3268
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3269 3270 3271 3272
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3273
    bool    freeTSRow = false;
3274
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3275 3276
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3277 3278
    }

H
Haojun Liao 已提交
3279
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3280 3281 3282
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3283 3284

    // no data in buffer, return immediately
3285
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3286 3287 3288
      break;
    }

3289
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3290 3291 3292 3293
      break;
    }
  } while (1);

3294
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3295 3296
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3297

3298
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3299
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3300 3301 3302
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

3303
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
3304
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3305 3306 3307
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3308 3309 3310 3311 3312 3313
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3314

dengyihao's avatar
dengyihao 已提交
3315 3316 3317 3318 3319 3320
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3321

H
Hongze Cheng 已提交
3322
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3323

C
Cary Xu 已提交
3324

H
refact  
Hongze Cheng 已提交
3325
// ====================================== EXPOSED APIs ======================================
3326 3327
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3328 3329
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3330 3331
    goto _err;
  }
H
Hongze Cheng 已提交
3332

3333
  // check for query time window
H
Haojun Liao 已提交
3334
  STsdbReader* pReader = *ppReader;
3335
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3336 3337 3338
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3339

3340 3341 3342
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3343
    int32_t     order = pCond->order;
3344 3345 3346 3347 3348 3349 3350 3351 3352 3353
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3354
    // here we only need one more row, so the capacity is set to be ONE.
3355 3356 3357 3358 3359 3360 3361 3362
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3363
    } else {
3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

3380 3381
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3382 3383 3384
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3385

H
Haojun Liao 已提交
3386 3387 3388
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3389

H
Hongze Cheng 已提交
3390
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3391 3392 3393
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3394

3395 3396
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3397

3398
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3399
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3400 3401 3402 3403 3404 3405 3406 3407 3408 3409

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3410
  } else {
3411
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3412 3413
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3414 3415 3416 3417 3418
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3419
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3420
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
3421 3422 3423 3424 3425 3426 3427 3428 3429

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3430 3431 3432
    }
  }

3433
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3434
  return code;
H
Hongze Cheng 已提交
3435 3436

_err:
S
Shengliang Guan 已提交
3437
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3438
  return code;
H
refact  
Hongze Cheng 已提交
3439 3440 3441
}

void tsdbReaderClose(STsdbReader* pReader) {
3442 3443
  if (pReader == NULL) {
    return;
3444
  }
H
refact  
Hongze Cheng 已提交
3445

3446
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3447
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
3448

3449 3450 3451 3452
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3453
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3454 3455 3456 3457 3458
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3459
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3460 3461

  cleanupDataBlockIterator(&pReader->status.blockIter);
3462 3463

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3464
  destroyBlockScanInfo(pReader->status.pTableMap);
3465
  blockDataDestroy(pReader->pResBlock);
3466

H
Haojun Liao 已提交
3467 3468 3469
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3470

H
Haojun Liao 已提交
3471 3472 3473 3474 3475 3476 3477
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
    tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
    taosArrayDestroy(pFilesetIter->pLastBlockReader->pBlockL);
    taosMemoryFree(pFilesetIter->pLastBlockReader);
  }

3478
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
3479

3480
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3481 3482
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3483 3484 3485 3486 3487
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
            ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3488

3489 3490
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3491 3492 3493
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3494
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3495 3496
}

3497
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3498
  // cleanup the data that belongs to the previous data block
3499 3500
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3501

3502
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3503

3504 3505 3506 3507 3508
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3509

3510 3511 3512
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3513
      buildBlockFromBufferSequentially(pReader);
3514
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3515
    }
3516 3517 3518
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3519
  }
3520

3521
  return false;
H
refact  
Hongze Cheng 已提交
3522 3523
}

3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3561 3562 3563 3564
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3565 3566
}

3567 3568
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3569
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3570
      setBlockInfo(pReader, pDataBlockInfo);
3571
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3572 3573 3574 3575 3576 3577 3578 3579 3580
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3581
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3582
  int32_t code = 0;
3583
  *allHave = false;
H
Hongze Cheng 已提交
3584

3585
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3586 3587 3588 3589
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3590
  // there is no statistics data for composed block
3591 3592 3593 3594
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3595

3596
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3597

3598
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
3599
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3600

3601 3602
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

3603
  if (tBlockHasSma(pBlock)) {
H
Hongze Cheng 已提交
3604
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3605
    if (code != TSDB_CODE_SUCCESS) {
3606 3607
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3608 3609
      return code;
    }
3610 3611 3612
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3613
  }
H
Hongze Cheng 已提交
3614

3615
  *allHave = true;
H
Hongze Cheng 已提交
3616

3617 3618
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3619

3620 3621
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3638 3639
      i += 1;
      j += 1;
3640 3641 3642 3643 3644 3645 3646
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3647
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3648
  pReader->cost.smaLoadTime += elapsed;
3649
  pReader->cost.smaDataLoad += 1;
3650 3651 3652

  *pBlockStatis = pSup->plist;

3653
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3654 3655
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3656
  return code;
H
Hongze Cheng 已提交
3657 3658
}

3659
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3660 3661 3662
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3663
    return pReader->pResBlock->pDataBlock;
3664
  }
3665

3666
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3667
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3668

H
Haojun Liao 已提交
3669
  tBlockDataReset(&pStatus->fileBlockData);
3670 3671
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3672 3673
    terrno = code;
    return NULL;
3674 3675 3676
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3677
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3678
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3679 3680
    terrno = code;
    return NULL;
3681
  }
3682 3683 3684

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3685 3686
}

3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3699
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3700 3701 3702
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3703

L
Liu Jicong 已提交
3704
  pReader->order = pCond->order;
3705
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3706
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3707
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3708
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3709

3710
  // allocate buffer in order to load data blocks from file
3711
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3712 3713
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3714
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3715
  tsdbDataFReaderClose(&pReader->pFileReader);
3716

3717
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3718 3719
  tsdbDataFReaderClose(&pReader->pFileReader);

3720
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3721
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3722
  resetDataBlockScanInfo(pReader->status.pTableMap);
3723

3724
  int32_t         code = 0;
3725 3726
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3727 3728 3729 3730 3731 3732
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3733 3734
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3735 3736 3737
      return code;
    }
  }
H
Hongze Cheng 已提交
3738

dengyihao's avatar
dengyihao 已提交
3739 3740
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3741

3742
  return code;
H
Hongze Cheng 已提交
3743
}
H
Hongze Cheng 已提交
3744

3745 3746 3747
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3748

3749 3750 3751 3752
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3753

3754 3755
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3756

3757 3758 3759
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3760

3761
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3762

3763
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3764

3765 3766
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3767

3768 3769
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3770

3771 3772
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3773
  }
H
Hongze Cheng 已提交
3774

3775
  pTableBlockInfo->numOfTables = numOfTables;
3776
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3777

3778 3779
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
3780
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3781

3782 3783
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3784

3785 3786 3787
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3788

3789 3790 3791
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3792

3793 3794 3795
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3796

3797 3798
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3799 3800

      hasNext = blockIteratorNext(&pStatus->blockIter);
3801 3802 3803 3804 3805
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3806

3807 3808
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3809
    }
H
refact  
Hongze Cheng 已提交
3810

H
Hongze Cheng 已提交
3811 3812
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3813
  }
H
Hongze Cheng 已提交
3814

H
refact  
Hongze Cheng 已提交
3815 3816
  return code;
}
H
Hongze Cheng 已提交
3817

H
refact  
Hongze Cheng 已提交
3818
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3819
  int64_t rows = 0;
H
Hongze Cheng 已提交
3820

3821 3822
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3823

3824 3825 3826 3827 3828
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3829
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3830 3831 3832 3833 3834 3835 3836
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3837
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3838 3839 3840 3841 3842 3843 3844 3845
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3846

H
refact  
Hongze Cheng 已提交
3847
  return rows;
H
Hongze Cheng 已提交
3848
}
D
dapan1121 已提交
3849

L
Liu Jicong 已提交
3850
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3863

D
dapan1121 已提交
3864
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3865
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3881

D
dapan1121 已提交
3882 3883
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3914
  // fs
H
Hongze Cheng 已提交
3915 3916 3917 3918 3919
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3920 3921 3922 3923 3924 3925 3926 3927

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3928
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3943
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3944
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3945
  }
H
Hongze Cheng 已提交
3946

S
Shengliang Guan 已提交
3947
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3948
}