tsdbRead.c 127.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o)  (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
73 74
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Hongze Cheng 已提交
75 76 77
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
78
  SArray*          pColAgg;
79
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
80
  SColumnDataAgg** plist;
81 82
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
83 84
} SBlockLoadSuppInfo;

85 86 87
typedef struct SLastBlockReader {
  STimeWindow   window;
  SVersionRange verRange;
88
  int32_t       order;
89
  uint64_t      uid;
90
  SMergeTree    mergeTree;
91
  SSttBlockLoadInfo* pInfo;
92 93
} SLastBlockReader;

94
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
95 96 97
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
98
  int32_t           order;
H
Hongze Cheng 已提交
99
  SLastBlockReader* pLastBlockReader;  // last file block reader
100
} SFilesetIter;
H
Haojun Liao 已提交
101 102

typedef struct SFileDataBlockInfo {
103
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
104
  uint64_t uid;
105
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
106 107 108
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
109
  int32_t   numOfBlocks;
110
  int32_t   index;
H
Hongze Cheng 已提交
111
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
112
  int32_t   order;
H
Hongze Cheng 已提交
113
  SDataBlk  block;  // current SDataBlk data
114
  SHashObj* pTableMap;
H
Haojun Liao 已提交
115 116 117
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
118 119 120 121
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
122 123
} SFileBlockDumpInfo;

124
typedef struct SUidOrderCheckInfo {
125 126
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
127 128
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
129
typedef struct SReaderStatus {
130
  bool                 loadFromFile;       // check file stage
131
  bool                 composedDataBlock;  // the returned data block is a composed block or not
132 133
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
134
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
135
  SFileBlockDumpInfo   fBlockDumpInfo;
136 137 138 139
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
140 141
} SReaderStatus;

H
Hongze Cheng 已提交
142
struct STsdbReader {
H
Haojun Liao 已提交
143 144 145 146 147 148 149
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
150 151
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
152
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
153
  STsdbReadSnap*     pReadSnap;
154
  SIOCostSummary     cost;
155 156
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
157 158
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
159

160 161
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
162
};
H
Hongze Cheng 已提交
163

H
Haojun Liao 已提交
164
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
165 166
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
167
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
168 169
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
170 171
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                                       SRowMerger* pMerger);
172
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
173
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
174
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
175
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
176
                                         int32_t rowIndex);
177
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
178
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
179

H
Hongze Cheng 已提交
180 181 182 183
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
184 185
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
186

dengyihao's avatar
dengyihao 已提交
187 188 189 190
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
191
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
192 193 194
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
195

196 197 198
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

199
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
200

201
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
202
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
203 204 205
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
206 207
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
208

H
Haojun Liao 已提交
209 210
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
211
    pSupInfo->colIds[i] = pCol->info.colId;
212 213 214 215

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
216
  }
H
Hongze Cheng 已提交
217

H
Haojun Liao 已提交
218 219
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
220

221
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
222
  // allocate buffer in order to load data blocks from file
223
  // todo use simple hash instead, optimize the memory consumption
224 225 226
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
227 228 229
    return NULL;
  }

230
  for (int32_t j = 0; j < numOfTables; ++j) {
231
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
232
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
233 234
      int64_t skey = pTsdbReader->window.skey;
      info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
wmmhello's avatar
wmmhello 已提交
235
    } else {
H
Haojun Liao 已提交
236 237
      int64_t ekey = pTsdbReader->window.ekey;
      info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
H
Haojun Liao 已提交
238
    }
wmmhello's avatar
wmmhello 已提交
239

240 241 242
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
243 244
  }

245 246
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
247

248
  return pTableMap;
H
Hongze Cheng 已提交
249
}
H
Hongze Cheng 已提交
250

H
Haojun Liao 已提交
251
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
252 253
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
254
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
255 256
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
257
    if (p->iter.iter != NULL) {
258
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
259 260
    }

261
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
262
    p->lastKey = ts;
263 264 265
  }
}

266 267 268 269 270 271 272 273
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
274
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
275 276 277
    }

    if (p->iiter.iter != NULL) {
278
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
279 280
    }

281 282
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
283
    tMapDataClear(&p->mapData);
284 285 286 287 288
  }

  taosHashCleanup(pTableMap);
}

289
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
290 291
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
292
}
H
Hongze Cheng 已提交
293

294 295 296
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
297
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
298

299
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
300
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
301

dengyihao's avatar
dengyihao 已提交
302
  STimeWindow win = *pWindow;
303 304 305 306 307 308
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
309

H
Haojun Liao 已提交
310
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
311 312 313 314 315 316
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
317 318 319
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
320 321 322 323
  }
}

// init file iterator
324
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
325
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
326

327 328
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
329
  pIter->pFileList = aDFileSet;
330
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
331

332 333 334 335
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
336
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
337 338
      return code;
    }
339 340
  }

341 342 343 344 345 346 347 348
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

349
  if (pLReader->pInfo == NULL) {
H
Haojun Liao 已提交
350 351 352 353 354
    pLReader->pInfo = tCreateLastBlockLoadInfo();
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
355 356
  }

357
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
358 359 360
  return TSDB_CODE_SUCCESS;
}

361
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
362 363
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
364 365 366
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
367 368 369
    return false;
  }

H
Haojun Liao 已提交
370 371 372
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

373 374
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
375
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
376

H
Haojun Liao 已提交
377 378
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
379

380
  while (1) {
H
Haojun Liao 已提交
381 382 383
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
384

385
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
386

387 388 389 390
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
391

392 393
    pReader->cost.headFileLoad += 1;

394 395 396 397 398 399 400 401 402 403 404 405
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
406 407 408
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
409 410
      continue;
    }
C
Cary Xu 已提交
411

412
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
413
              pReader->window.ekey, pReader->idStr);
414 415
    return true;
  }
416

417
_err:
H
Haojun Liao 已提交
418 419 420
  return false;
}

421
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
422 423
  pIter->order = order;
  pIter->index = -1;
424
  pIter->numOfBlocks = 0;
425 426 427 428 429 430 431
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
432
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
433

H
Haojun Liao 已提交
434
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
435 436
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
437 438
}

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

462 463
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
464
  int32_t      code = 0;
465
  int8_t       level = 0;
H
Haojun Liao 已提交
466
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
467 468
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
469
    goto _end;
H
Hongze Cheng 已提交
470 471
  }

C
Cary Xu 已提交
472 473 474 475
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
476
  initReaderStatus(&pReader->status);
477

L
Liu Jicong 已提交
478
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
479 480
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
481
  pReader->capacity = 4096;
dengyihao's avatar
dengyihao 已提交
482 483
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
484
  pReader->type = pCond->type;
485
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
486

487
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
488

489
  limitOutputBufferSize(pCond, &pReader->capacity);
490

491 492
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
493
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
494
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
495
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
496 497 498
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
499

500 501
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
502
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
503 504 505 506 507
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

508 509 510 511
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
512
  }
H
Hongze Cheng 已提交
513

514 515
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
516 517
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
518

H
Haojun Liao 已提交
519 520
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
521 522 523
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
524

H
Haojun Liao 已提交
525
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
526
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
527

528
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
529
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
530
  if (code != TSDB_CODE_SUCCESS) {
531
    goto _end;
H
Haojun Liao 已提交
532
  }
H
Hongze Cheng 已提交
533

534 535
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
536
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
537 538
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
539

540 541 542 543
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
544
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
545

546
    // uid check
H
Hongze Cheng 已提交
547
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
548 549 550 551
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
552
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
553 554 555 556 557 558
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
559
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
560 561
    }

H
Hongze Cheng 已提交
562
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
563
  }
H
Hongze Cheng 已提交
564

565
  int64_t et2 = taosGetTimestampUs();
566
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
567
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
568 569 570

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

571
_end:
H
Hongze Cheng 已提交
572
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
573 574
  return code;
}
H
Hongze Cheng 已提交
575

576
static void cleanupTableScanInfo(SHashObj* pTableMap) {
577
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
578
  while (1) {
579
    px = taosHashIterate(pTableMap, px);
580 581 582 583
    if (px == NULL) {
      break;
    }

584
    // reset the index in last block when handing a new file
585
    tMapDataClear(&px->mapData);
586 587
    taosArrayClear(px->pBlockList);
  }
588 589
}

590
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
591 592 593 594 595 596
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
597

dengyihao's avatar
dengyihao 已提交
598
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
599
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
600

601
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
602

603
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
604
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
605

606
    sizeInDisk += pScanInfo->mapData.nData;
607
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
608 609
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
610

611
      // 1. time range check
612
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
613 614
        continue;
      }
H
Hongze Cheng 已提交
615

616
      // 2. version range check
H
Hongze Cheng 已提交
617
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
618 619
        continue;
      }
620

621
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
622
      if (p == NULL) {
623
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
624 625
        return TSDB_CODE_OUT_OF_MEMORY;
      }
626

627
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
628
    }
H
Hongze Cheng 已提交
629

H
Haojun Liao 已提交
630
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
631 632 633 634
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
635
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
636
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
637

638
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Hongze Cheng 已提交
639
  tsdbDebug(
640
      "load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
641
      "time:%.2f ms %s",
642
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
H
Hongze Cheng 已提交
643
      pReader->idStr);
644

645
  pReader->cost.numOfBlocks += total;
646
  pReader->cost.headFileLoadTime += el;
647

H
Haojun Liao 已提交
648 649
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
650

651
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
652
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
653
  pDumpInfo->allDumped = true;
654
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
655 656
}

657 658
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
659
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
660
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
661 662 663
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
664
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
665 666 667 668
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
669
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
670
  }
H
Haojun Liao 已提交
671 672
}

673
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
674 675
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
676 677
    return NULL;
  }
678 679 680

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
681 682
}

H
Hongze Cheng 已提交
683
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
684

685
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
686
  SReaderStatus*  pStatus = &pReader->status;
687
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
688

689
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
690
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
691
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
692
  SSDataBlock*        pResBlock = pReader->pResBlock;
693
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
694

H
Haojun Liao 已提交
695
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
696
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
697

H
Haojun Liao 已提交
698
  SColVal cv = {0};
699
  int64_t st = taosGetTimestampUs();
700 701
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
702

703
  int32_t rowIndex = 0;
704 705
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

706 707 708 709 710 711 712 713
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

714
  int32_t          i = 0;
715 716
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
717
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
718 719 720 721 722
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

723 724 725
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
726 727 728
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
729
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
730 731 732
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
733
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
734 735
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
736
      }
737
      colIndex += 1;
738
      i += 1;
739
      ASSERT(rowIndex == remain);
740 741
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
742
      i += 1;
H
Haojun Liao 已提交
743
    }
744 745
  }

746
  while (i < numOfOutputCols) {
747 748 749
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
750
  }
H
Haojun Liao 已提交
751

752
  pResBlock->info.rows = remain;
753
  pDumpInfo->rowIndex += step * remain;
754

755
  setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
H
Haojun Liao 已提交
756

757
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
758
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
759

760
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
761
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
762
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
763
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
764
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
765 766 767 768

  return TSDB_CODE_SUCCESS;
}

769
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
770 771
  int64_t st = taosGetTimestampUs();

772
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
773
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
774
  ASSERT(pBlockInfo != NULL);
775

H
Hongze Cheng 已提交
776 777
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
  int32_t   code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
778 779 780
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
781
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
782 783 784
              tstrerror(code), pReader->idStr);
    return code;
  }
785

786
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
787

788 789 790 791
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
792 793 794

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
795

H
Haojun Liao 已提交
796
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
797
}
H
Hongze Cheng 已提交
798

H
Haojun Liao 已提交
799 800 801
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
802

H
Haojun Liao 已提交
803 804 805 806
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
807

H
Haojun Liao 已提交
808 809
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
810

H
Haojun Liao 已提交
811 812
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
813

H
Haojun Liao 已提交
814
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
815 816
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
817

H
Haojun Liao 已提交
818 819 820 821
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
822

H
Haojun Liao 已提交
823 824
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
825

H
Haojun Liao 已提交
826
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
827
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
828
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
829

H
Haojun Liao 已提交
830
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
831

H
Haojun Liao 已提交
832 833
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
834

H
Haojun Liao 已提交
835 836 837 838 839 840 841
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
842

843
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
844
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
845

846 847 848 849
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
850 851 852
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
853
    int32_t*             mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
854
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
855
  }
856 857 858 859 860 861

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
862
}
H
Hongze Cheng 已提交
863

864
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
865
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
866

867
  pBlockIter->numOfBlocks = numOfBlocks;
868
  taosArrayClear(pBlockIter->blockList);
869
  pBlockIter->pTableMap = pReader->status.pTableMap;
870

871 872
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
873

874
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
875

876
  SBlockOrderSupporter sup = {0};
877
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
878 879 880
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
881

882 883 884 885 886 887 888
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
889

890 891 892 893
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
894

895 896
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
897

898 899 900 901 902
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
903

904
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
905
    SDataBlk block = {0};
906 907
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
908 909

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
910
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
911

912
      wrapper.uid = pTableScanInfo->uid;
913
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
914

915 916 917 918 919 920
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
921

922
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
923

924
  // since there is only one table qualified, blocks are not sorted
925 926
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
927 928
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
929
    }
930

931
    int64_t et = taosGetTimestampUs();
932
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
933
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
934

935
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
936
    cleanupBlockOrderSupporter(&sup);
937
    doSetCurrentBlock(pBlockIter);
938
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
939
  }
H
Haojun Liao 已提交
940

941 942
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
943

944
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
945

946 947 948 949 950
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
951
  }
H
Haojun Liao 已提交
952

953 954 955 956
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
957

958 959
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
960

961 962 963 964
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
965

966 967
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
968
  }
H
Haojun Liao 已提交
969

970
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
971 972
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
973 974
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
975

976
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
977 978
  doSetCurrentBlock(pBlockIter);

979
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
980
}
H
Hongze Cheng 已提交
981

H
Haojun Liao 已提交
982
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
983 984
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

985
  int32_t step = asc ? 1 : -1;
986
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
987 988 989
    return false;
  }

990
  pBlockIter->index += step;
991 992
  doSetCurrentBlock(pBlockIter);

993 994 995
  return true;
}

996 997 998
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
999
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1000 1001
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1002 1003
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1004
}
H
Hongze Cheng 已提交
1005

H
Hongze Cheng 已提交
1006 1007
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                             int32_t* nextIndex, int32_t order) {
1008 1009 1010
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1011 1012
  }

1013
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1014 1015 1016
    return NULL;
  }

1017
  int32_t step = asc ? 1 : -1;
1018
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1019

H
Hongze Cheng 已提交
1020 1021
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1022

H
Hongze Cheng 已提交
1023
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1024
  return pBlock;
1025 1026 1027 1028 1029
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1030
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1031 1032
  int32_t index = pBlockIter->index;

1033
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1046
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1047
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1048 1049 1050 1051
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1052 1053 1054 1055 1056
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1057

1058 1059 1060
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1061

1062
  doSetCurrentBlock(pBlockIter);
1063 1064 1065
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1066
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1067 1068 1069 1070 1071 1072
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1073
}
H
Hongze Cheng 已提交
1074

H
Hongze Cheng 已提交
1075
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1076
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1077

1078
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1079
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1080
}
H
Hongze Cheng 已提交
1081

H
Hongze Cheng 已提交
1082
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1083 1084
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1085 1086
}

H
Hongze Cheng 已提交
1087
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1088 1089 1090 1091 1092
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1093
      if (p->version >= pBlock->minVer) {
1094 1095 1096
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1097
      if (p->version >= pBlock->minVer) {
1098 1099 1100 1101 1102 1103 1104
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1105
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1121
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1122 1123 1124 1125
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1126
  // ts is not overlap
1127
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1128
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1129 1130 1131 1132 1133
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1134 1135 1136 1137
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1138
    while (1) {
1139 1140 1141 1142 1143
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1144 1145 1146
      }
    }

1147 1148
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1149 1150
}

1151 1152 1153 1154
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1155
// 5. delete info should not overlap with current block data
H
Hongze Cheng 已提交
1156
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock,
1157
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
H
Hongze Cheng 已提交
1158 1159
  int32_t   neighborIndex = 0;
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
1160

1161
  // overlap with neighbor
1162 1163 1164
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1165
    taosMemoryFree(pNeighbor);
1166 1167
  }

1168
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1169 1170
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1171

1172
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
1173
  // todo
1174
  bool overlapWithlastBlock = false;
1175
#if 0
H
Hongze Cheng 已提交
1176
  if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
H
Hongze Cheng 已提交
1177
    SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
H
Hongze Cheng 已提交
1178
    overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
1179
  }
1180
#endif
1181

1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
  bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
  bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);

  bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
                        moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1192
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1193 1194 1195 1196 1197 1198
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
              pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
              moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1199 1200
}

1201
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1202
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1203 1204
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1205

1206 1207 1208
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1209
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1210

1211
  blockDataUpdateTsWindow(pBlock, 0);
1212
  pBlock->info.uid = pBlockScanInfo->uid;
1213

1214
  setComposedBlockFlag(pReader, true);
1215

1216
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1217
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1218 1219 1220
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1221 1222

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1223 1224 1225
  return code;
}

1226 1227
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1228 1229 1230 1231 1232
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1233
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1234 1235

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1236
    if (nextKey != key) {  // merge is not needed
1237
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1238 1239 1240 1241 1242 1243 1244 1245
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo);
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1278 1279 1280 1281 1282 1283
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1284
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1303
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1304 1305 1306 1307 1308 1309
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1310
  int64_t tsLast = INT64_MIN;
1311
  if (hasDataInLastBlock(pLastBlockReader)) {
1312 1313
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1314

H
Hongze Cheng 已提交
1315 1316
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1317

1318 1319
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1320
    minKey = INT64_MAX;  // chosen the minimum value
1321
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1322 1323
      minKey = tsLast;
    }
1324

1325 1326 1327
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1328

1329 1330 1331 1332 1333
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1334
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }
1345 1346 1347 1348
  }

  bool init = false;

1349
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1350
  // DESC: mem -----> imem -----> last block -----> file block
1351 1352
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1353
      init = true;
1354 1355
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1356 1357
    }

1358
    if (minKey == tsLast) {
1359
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1360 1361 1362
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1363 1364 1365
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1366
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1367
    }
1368

1369
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1370 1371 1372
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1373 1374 1375 1376 1377 1378 1379 1380 1381
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1382 1383
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1384
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1385 1386
    }

1387
    if (minKey == tsLast) {
1388
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1389 1390 1391
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1392 1393 1394
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1395
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1396 1397 1398
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1399 1400 1401
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1402 1403 1404 1405 1406
        init = true;
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1407 1408
  }

1409 1410 1411 1412 1413
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1414 1415 1416 1417 1418 1419 1420
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1421 1422 1423
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1424
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1425
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1426 1427 1428

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1429
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
1430

1431 1432 1433 1434 1435 1436
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1437 1438 1439

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1440
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
1441

1442 1443 1444 1445
      int32_t code = tRowMergerGetRow(&merge, &pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1446

1447 1448 1449 1450 1451 1452 1453 1454
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
H
Haojun Liao 已提交
1455
    ASSERT(mergeBlockData);
1456 1457

    // merge with block data if ts == key
H
Haojun Liao 已提交
1458
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

    int32_t code = tRowMergerGetRow(&merge, &pTSRow);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1472 1473 1474 1475

  return TSDB_CODE_SUCCESS;
}

1476 1477
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1478 1479
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1480 1481
  if (pBlockData->nRow > 0) {
    // no last block available, only data block exists
1482
    if (!hasDataInLastBlock(pLastBlockReader)) {
1483 1484 1485 1486 1487 1488 1489 1490 1491
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1492
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1493 1494 1495 1496
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1497

1498 1499
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1500 1501 1502 1503

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1504
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
1505

1506 1507 1508 1509 1510
        int32_t code = tRowMergerGetRow(&merge, &pTSRow);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1511
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1512

1513 1514
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1515
        return code;
1516
      } else {
1517 1518
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1519
      }
1520
    } else {  // desc order
1521
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1522
    }
1523
  } else {  // only last block exists
1524
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1525
  }
1526 1527
}

1528 1529
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
1530 1531 1532 1533 1534 1535
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1536 1537
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1538 1539
  ASSERT(pRow != NULL && piRow != NULL);

1540
  int64_t tsLast = INT64_MIN;
1541 1542 1543
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1544 1545 1546 1547 1548 1549

  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1550
  int64_t minKey = 0;
1551 1552 1553 1554 1555
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1556

1557 1558 1559
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1560

1561 1562 1563
    if (minKey > key && pBlockData->nRow > 0) {
      minKey = key;
    }
1564

1565
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1566 1567 1568
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1569
    minKey = INT64_MIN;  // let find the maximum ts value
1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

    if (minKey < key && pBlockData->nRow > 0) {
      minKey = key;
    }

1582
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1583 1584
      minKey = tsLast;
    }
1585 1586 1587 1588
  }

  bool init = false;

1589 1590 1591 1592
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1593
      init = true;
1594 1595 1596
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1597 1598
    }

1599
    if (minKey == tsLast) {
1600
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1601 1602 1603
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1604 1605 1606
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1607
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1608 1609 1610
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1611 1612 1613
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1614 1615 1616 1617 1618
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1619 1620
    }

1621
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1622 1623 1624
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1625 1626 1627 1628 1629 1630 1631 1632
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, pRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1633 1634
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      tRowMergerInit(&merge, pRow, pSchema);
1635 1636 1637 1638
      doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1639 1640 1641
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1642 1643 1644 1645 1646 1647 1648 1649
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
        tRowMergerInit(&merge, piRow, pSchema);
      }
      doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    }

    if (minKey == tsLast) {
1650
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1651 1652 1653
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1654 1655 1656
        init = true;
        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
      }
H
Haojun Liao 已提交
1657
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
1658 1659 1660
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1661
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1662 1663
      if (!init) {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1664 1665
      } else {
        tRowMerge(&merge, &fRow);
1666 1667
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1668 1669 1670
    }
  }

1671 1672 1673 1674 1675
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1676 1677 1678 1679
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
1680
  return code;
1681 1682
}

1683
#if 0
1684
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1685 1686 1687
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1688
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1689
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1690

1691 1692
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1693
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1694

1695
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1696
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1697

1698
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1699

1700 1701 1702
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1703 1704
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1705 1706 1707
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1708
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1709

1710 1711
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1712
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1713 1714
      }

1715 1716
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1717
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1718 1719 1720
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1721
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1722
      return TSDB_CODE_SUCCESS;
1723
    } else {  // key > ik.ts || key > k.ts
1724 1725
      ASSERT(key != ik.ts);

1726
      // [3] ik.ts < key <= k.ts
1727
      // [4] ik.ts < k.ts <= key
1728
      if (ik.ts < k.ts) {
1729
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1730
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1731 1732 1733
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1734 1735 1736
        return TSDB_CODE_SUCCESS;
      }

1737 1738
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1739
      if (k.ts < ik.ts) {
1740
        doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1741
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1742 1743 1744
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1745 1746 1747
        return TSDB_CODE_SUCCESS;
      }

1748
      // [7] k.ts == ik.ts < key
1749
      if (k.ts == ik.ts) {
1750 1751
        ASSERT(key > ik.ts && key > k.ts);

1752
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1753
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1754
        taosMemoryFree(pTSRow);
1755 1756 1757
        return TSDB_CODE_SUCCESS;
      }
    }
1758 1759 1760
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1761
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1762

H
Haojun Liao 已提交
1763
      tRowMergerInit(&merge, pRow, pSchema);
1764
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1765 1766 1767

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1768
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1769 1770 1771 1772 1773 1774 1775 1776 1777
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1778
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1779 1780
      return TSDB_CODE_SUCCESS;
    } else {
1781
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1782 1783 1784 1785

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1786
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1787
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1788 1789 1790
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1802
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1803
        taosMemoryFree(pTSRow);
1804 1805 1806 1807 1808
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1809
        doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1810 1811 1812 1813 1814

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1815
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1816 1817

        taosMemoryFree(pTSRow);
1818 1819 1820 1821 1822 1823
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1824
  return -1;
1825
}
1826
#endif
1827

1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1853
                  "-%" PRId64 " %s",
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
1874
                  "-%" PRId64 " %s",
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1892 1893
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1894 1895 1896 1897 1898 1899 1900 1901
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1913
  TSDBKEY k = {.ts = ts, .version = ver};
1914
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1915 1916 1917
    return false;
  }

1918 1919 1920
  return true;
}

1921
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1922

1923
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
1924
  // the last block reader has been initialized for this table.
1925
  if (pLBlockReader->uid == pScanInfo->uid) {
1926 1927 1928
    return true;
  }

1929 1930
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
1931 1932
  }

1933 1934
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
1935

1936 1937 1938 1939
  int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
1940
  } else {
1941
    w.ekey = pScanInfo->lastKey + step;
1942 1943
  }

1944
  int32_t code =
1945
      tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
H
Haojun Liao 已提交
1946
                     pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr);
1947 1948 1949 1950
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

1951
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
1952 1953
}

1954
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
1955
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
1956
  return TSDBROW_TS(&row);
1957 1958
}

H
Hongze Cheng 已提交
1959
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
1960

1961 1962
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1975 1976 1977 1978 1979
    int32_t code = tRowMergerGetRow(&merge, &pTSRow);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1980 1981 1982 1983 1984 1985 1986 1987
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
1988 1989
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
1990 1991
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

H
Hongze Cheng 已提交
1992
  int64_t  key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1993 1994
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1995

1996
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1997
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1998
  } else {
1999
    // imem + file + last block
2000
    if (pBlockScanInfo->iiter.hasVal) {
2001
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2002 2003
    }

2004
    // mem + file + last block
2005
    if (pBlockScanInfo->iter.hasVal) {
2006
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2007
    }
2008

2009 2010
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2011 2012 2013
  }
}

2014
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
2015 2016
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2017
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2018 2019 2020 2021 2022 2023 2024 2025

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2026
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2027
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2028 2029
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2030

2031 2032
  int64_t st = taosGetTimestampUs();

2033
  while (1) {
2034
    // todo check the validate of row in file block
2035
    bool hasBlockData = false;
2036
    {
H
Haojun Liao 已提交
2037
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2038 2039 2040 2041 2042
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2043 2044
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2045
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2046
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2047
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2048 2049 2050
          break;
        }
      }
2051
    }
2052

2053
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2054

2055 2056 2057
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2058 2059
    }

2060
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2061

2062
    // currently loaded file data block is consumed
2063
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2064
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2065
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2066 2067 2068 2069 2070
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2071 2072 2073 2074
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
2075 2076
  blockDataUpdateTsWindow(pResBlock, 0);

2077
  setComposedBlockFlag(pReader, true);
H
Haojun Liao 已提交
2078 2079 2080 2081
  double el = (taosGetTimestampUs() - st)/1000.0;

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2082

2083 2084 2085 2086
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
              pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2087
              pResBlock->info.rows, el, pReader->idStr);
2088
  }
2089

2090 2091 2092 2093 2094
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2095 2096
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2097 2098 2099
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2100

2101 2102 2103
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2104 2105
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2106
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2107 2108
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2109
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2110
    if (code != TSDB_CODE_SUCCESS) {
2111 2112 2113 2114 2115
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2116
      tsdbDelFReaderClose(&pDelFReader);
2117 2118 2119
      goto _err;
    }

H
Hongze Cheng 已提交
2120
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2121 2122 2123
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2124 2125
      goto _err;
    }
2126

2127 2128 2129
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2130
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2131
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2132 2133 2134 2135 2136 2137 2138
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2139
    }
2140
  }
2141

2142 2143 2144 2145 2146 2147 2148
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2149 2150
  }

2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2165 2166
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2167 2168
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2169
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2170 2171
  return code;

2172 2173 2174
_err:
  taosArrayDestroy(pDelData);
  return code;
2175 2176
}

2177
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2178
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2179
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2180
  if (pRow != NULL) {
2181 2182 2183
    key = TSDBROW_KEY(pRow);
  }

2184
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2185
  if (pRow != NULL) {
2186 2187 2188 2189 2190 2191 2192 2193 2194
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2195
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2196
  SReaderStatus* pStatus = &pReader->status;
2197
  pBlockNum->numOfBlocks = 0;
2198
  pBlockNum->numOfLastFiles = 0;
2199

2200
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2201
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2202 2203

  while (1) {
2204
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2205
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2206 2207 2208
      break;
    }

H
Haojun Liao 已提交
2209
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2210 2211
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2212
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2213 2214 2215
      return code;
    }

H
Hongze Cheng 已提交
2216
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2217
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2218
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2219
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2220 2221 2222
        return code;
      }

2223
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2224 2225 2226
        break;
      }
    }
2227

H
Haojun Liao 已提交
2228 2229 2230
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2231
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2232 2233 2234
  return TSDB_CODE_SUCCESS;
}

2235
static int32_t uidComparFunc(const void* p1, const void* p2) {
2236 2237
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2238 2239 2240
  if (pu1 == pu2) {
    return 0;
  } else {
2241
    return (pu1 < pu2) ? -1 : 1;
2242 2243
  }
}
2244

2245
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2246 2247 2248 2249
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2250
  while (p != NULL) {
2251 2252 2253 2254 2255 2256 2257 2258
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2259
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2260 2261 2262 2263
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2264

2265
  if (pOrderCheckInfo->tableUidList == NULL) {
2266 2267 2268 2269 2270 2271
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2272
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2273 2274 2275
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2276 2277
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2278 2279
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2280 2281

      // the tableMap has already updated
2282
      if (pStatus->pTableIter == NULL) {
2283
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2284 2285 2286 2287 2288 2289 2290 2291 2292
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2293
      }
2294
    }
2295
  }
2296

2297 2298 2299
  return TSDB_CODE_SUCCESS;
}

2300
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2313
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2314
  SReaderStatus*    pStatus = &pReader->status;
2315 2316
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2317 2318
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2319
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2320 2321
    return code;
  }
2322

2323
  while (1) {
2324 2325
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2326
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2327
    if (!hasVal) {
2328 2329
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2330 2331 2332
        return TSDB_CODE_SUCCESS;
      }
      continue;
2333 2334
    }

2335 2336 2337 2338 2339 2340 2341 2342
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2343

2344
    // current table is exhausted, let's try next table
2345 2346
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2347 2348
      return TSDB_CODE_SUCCESS;
    }
2349 2350 2351
  }
}

2352
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2353 2354
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2355 2356 2357

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2358 2359 2360
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2361

2362
  if (pBlockInfo != NULL) {
2363 2364 2365 2366 2367 2368
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

  if (pBlockInfo != NULL) {
2369
    pBlock = getCurrentBlock(pBlockIter);
2370 2371
  }

2372
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2373
  TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
2374

2375 2376 2377 2378 2379 2380 2381
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    tBlockDataReset(&pStatus->fileBlockData);
    code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
2382
      return code;
2383
    }
2384

2385 2386 2387
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2388 2389 2390
    }

    // build composed data block
2391
    code = buildComposedDataBlock(pReader);
2392 2393
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2394
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2395
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2396
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2397 2398 2399 2400
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2401
      ASSERT(tsLast >= pBlock->maxKey.ts);
2402 2403 2404
      tBlockDataReset(&pReader->status.fileBlockData);

      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2405
    } else {  // whole block is required, return it directly
2406 2407 2408 2409 2410 2411 2412
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2413 2414 2415 2416 2417
  }

  return code;
}

H
Haojun Liao 已提交
2418
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2419 2420
  SReaderStatus* pStatus = &pReader->status;

2421
  while (1) {
2422 2423 2424
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2425
        return TSDB_CODE_SUCCESS;
2426 2427 2428 2429
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2430
    initMemDataIterator(pBlockScanInfo, pReader);
2431

2432
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2433
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2434 2435 2436 2437
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2438
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2439
      return TSDB_CODE_SUCCESS;
2440 2441 2442 2443 2444
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2445
      return TSDB_CODE_SUCCESS;
2446 2447 2448 2449
    }
  }
}

2450
// set the correct start position in case of the first/last file block, according to the query time window
2451
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2452
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2453

2454 2455 2456
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2457 2458 2459

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2460
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2461 2462
}

2463
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2464 2465
  SBlockNumber num = {0};

2466
  int32_t code = moveToNextFile(pReader, &num);
2467 2468 2469 2470 2471
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2472
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2473 2474 2475 2476 2477
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2478 2479
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2480
  } else {  // no block data, only last block exists
2481
    tBlockDataReset(&pReader->status.fileBlockData);
2482
    resetDataBlockIterator(pBlockIter, pReader->order);
2483
  }
2484 2485

  // set the correct start position according to the query time window
2486
  initBlockDumpInfo(pReader, pBlockIter);
2487 2488 2489
  return code;
}

2490
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2491 2492
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2493 2494
}

2495
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2496
  int32_t code = TSDB_CODE_SUCCESS;
2497 2498
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2499 2500
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2501
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2502
  _begin:
2503 2504 2505 2506 2507
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2508 2509 2510 2511
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2512
    // all data blocks are checked in this last block file, now let's try the next file
2513 2514 2515 2516 2517 2518 2519 2520
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2521
      // this file does not have data files, let's start check the last block file if exists
2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2537
  while (1) {
2538 2539
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2540
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2541
      code = buildComposedDataBlock(pReader);
2542 2543 2544 2545 2546 2547 2548
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2549
        } else {
H
Haojun Liao 已提交
2550
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2551 2552 2553 2554 2555 2556
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2557

2558 2559 2560 2561
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2562

2563 2564 2565 2566
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2567
          }
2568
        }
H
Haojun Liao 已提交
2569
      }
2570 2571

      code = doBuildDataBlock(pReader);
2572 2573
    }

2574 2575 2576 2577 2578 2579 2580 2581
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2582
}
H
refact  
Hongze Cheng 已提交
2583

2584 2585
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2586
  if (VND_IS_RSMA(pVnode)) {
2587
    int8_t  level = 0;
2588 2589
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2590
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2604
    const char* str = (idStr != NULL) ? idStr : "";
2605 2606

    if (level == TSDB_RETENTION_L0) {
2607
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2608
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2609 2610
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2611
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2612
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2613 2614
      return VND_RSMA1(pVnode);
    } else {
2615
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2616
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2617 2618 2619 2620 2621 2622 2623
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2624
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2625
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2626 2627

  int64_t endVer = 0;
L
Liu Jicong 已提交
2628 2629
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2630 2631
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2632
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2633 2634
  }

H
Haojun Liao 已提交
2635
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2636 2637
}

2638
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2639 2640 2641 2642
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2643 2644 2645
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2646

2647 2648 2649 2650 2651 2652
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2653
        return false;
2654 2655 2656
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2657 2658
      }
    } else {
2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2689 2690
    }
  } else {
2691 2692
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2693

2694 2695 2696 2697 2698 2699 2700
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2701
    } else {
2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2729 2730 2731 2732 2733
      }

      return false;
    }
  }
2734 2735

  return false;
2736 2737
}

2738
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2739
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2740 2741
    return NULL;
  }
H
Hongze Cheng 已提交
2742

2743
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2744
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2745
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2746
    pIter->hasVal = false;
H
Haojun Liao 已提交
2747 2748
    return NULL;
  }
H
Hongze Cheng 已提交
2749

2750
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2751
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2752
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2753 2754
    return pRow;
  }
H
Hongze Cheng 已提交
2755

2756
  while (1) {
2757 2758
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2759 2760
      return NULL;
    }
H
Hongze Cheng 已提交
2761

2762
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2763

H
Haojun Liao 已提交
2764
    key = TSDBROW_KEY(pRow);
2765
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2766
      pIter->hasVal = false;
H
Haojun Liao 已提交
2767 2768
      return NULL;
    }
H
Hongze Cheng 已提交
2769

dengyihao's avatar
dengyihao 已提交
2770
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2771
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2772 2773 2774 2775
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2776

2777 2778
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2779
  while (1) {
2780 2781
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2782 2783
      break;
    }
H
Hongze Cheng 已提交
2784

2785
    // data exists but not valid
2786
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
2787 2788 2789 2790 2791
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2792
    TSDBKEY k = TSDBROW_KEY(pRow);
2793
    if (k.ts != ts) {
H
Haojun Liao 已提交
2794 2795 2796
      break;
    }

H
Haojun Liao 已提交
2797
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2798
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2799 2800 2801 2802 2803
  }

  return TSDB_CODE_SUCCESS;
}

2804
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2805
                                          SVersionRange* pVerRange, int32_t step) {
2806 2807
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2808
      rowIndex += step;
2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
2825
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
2826 2827
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2828
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2829
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2830

2831
  *state = CHECK_FILEBLOCK_QUIT;
2832
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2833

H
Hongze Cheng 已提交
2834 2835
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2836
  if (pNeighborBlock == NULL) {  // do nothing
2837 2838 2839 2840
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2841 2842
  taosMemoryFree(pNeighborBlock);

2843
  if (overlap) {  // load next block
2844
    SReaderStatus*  pStatus = &pReader->status;
2845 2846
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2847
    // 1. find the next neighbor block in the scan block list
2848
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2849
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2850

2851
    // 2. remove it from the scan block list
2852
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2853

2854
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2855
    tBlockDataReset(&pStatus->fileBlockData);
2856 2857 2858 2859 2860 2861
    int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2862 2863 2864 2865
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2866
    // 4. check the data values
2867 2868 2869 2870
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2871
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2872 2873 2874 2875 2876 2877 2878
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2879 2880
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2881 2882
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2883
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2884
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2885
  int32_t step = asc ? 1 : -1;
2886

2887
  pDumpInfo->rowIndex += step;
2888
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2889 2890 2891
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2892

2893 2894 2895 2896
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2897

2898
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2899
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2900 2901 2902
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2903
      }
2904
    }
H
Haojun Liao 已提交
2905
  }
2906

H
Haojun Liao 已提交
2907 2908 2909
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
2910 2911
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
                               SRowMerger* pMerger) {
H
Haojun Liao 已提交
2912
  pScanInfo->lastKey = ts;
2913
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
2914 2915
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
2916
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2917 2918 2919 2920 2921 2922 2923 2924 2925
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2926 2927
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
2928
  TSDBROW* pNextRow = NULL;
2929
  TSDBROW  current = *pRow;
2930

2931 2932
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
2933

2934 2935 2936
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
2937
      return TSDB_CODE_SUCCESS;
2938
    } else {  // has next point in mem/imem
2939
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
2940 2941 2942
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2943
        return TSDB_CODE_SUCCESS;
2944 2945
      }

H
Haojun Liao 已提交
2946
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
2947 2948
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
2949
        return TSDB_CODE_SUCCESS;
2950
      }
2951
    }
2952 2953
  }

2954 2955
  SRowMerger merge = {0};

2956
  // get the correct schema for data in memory
H
Haojun Liao 已提交
2957
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
2958

2959 2960
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
2961
  }
H
Haojun Liao 已提交
2962

H
Haojun Liao 已提交
2963 2964 2965 2966 2967 2968
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
2969 2970 2971 2972
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
2973

2974
  tRowMergerClear(&merge);
2975
  *freeTSRow = true;
2976
  return TSDB_CODE_SUCCESS;
2977 2978
}

2979
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
2980
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
2981 2982
  SRowMerger merge = {0};

2983 2984 2985
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2986
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
2987
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2988

H
Haojun Liao 已提交
2989
    tRowMergerInit(&merge, piRow, pSchema);
2990
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2991

2992
    tRowMerge(&merge, pRow);
2993
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2994
  } else {
H
Haojun Liao 已提交
2995
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2996

H
Haojun Liao 已提交
2997
    tRowMergerInit(&merge, pRow, pSchema);
2998
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2999 3000

    tRowMerge(&merge, piRow);
3001
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
3002
  }
3003

3004 3005
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3006 3007
}

3008 3009
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3010 3011
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3012
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3013
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3014

3015 3016
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3017
  if (pBlockScanInfo->iter.hasVal) {
3018 3019 3020 3021 3022 3023
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3024
  if (pBlockScanInfo->iiter.hasVal) {
3025 3026 3027 3028 3029 3030
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3031
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3032
    TSDBKEY k = TSDBROW_KEY(pRow);
3033
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3034

3035
    int32_t code = TSDB_CODE_SUCCESS;
3036 3037
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3038
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3039
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3040
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3041
      }
3042
    } else {  // ik.ts == k.ts
3043
      *freeTSRow = true;
3044 3045 3046 3047
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3048
    }
3049

3050
    return code;
H
Haojun Liao 已提交
3051 3052
  }

3053
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3054 3055
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3056 3057
  }

3058
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3059
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3060 3061 3062 3063 3064
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3065
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3066 3067 3068
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3069
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3070
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3071

3072
  SColVal colVal = {0};
3073
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3074

3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3086
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3087 3088 3089 3090 3091 3092 3093 3094
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3095
    }
3096 3097
  }

3098
  // set null value since current column does not exist in the "pSchema"
3099
  while (i < numOfCols) {
3100 3101 3102 3103 3104
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3105 3106 3107 3108
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3109 3110
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3111 3112 3113 3114 3115 3116 3117 3118
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3119
    i += 1;
3120 3121 3122
  }

  SColVal cv = {0};
3123 3124
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3125

3126
  while (i < numOfOutputCols && j < numOfInputCols) {
3127
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3128
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3129

3130 3131 3132 3133 3134
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3135
    if (pData->cid == pCol->info.colId) {
3136 3137
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3138
      j += 1;
3139
    } else if (pData->cid > pCol->info.colId) {  // the specified column does not exist in file block, fill with null data
3140 3141 3142 3143 3144 3145 3146 3147
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3148
    colDataAppendNULL(pCol, outputRowIndex);
3149 3150 3151 3152 3153 3154 3155
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3156 3157
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3158 3159 3160 3161
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3162
    bool    freeTSRow = false;
3163
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3164 3165
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3166 3167
    }

H
Haojun Liao 已提交
3168
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3169 3170 3171
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3172 3173

    // no data in buffer, return immediately
3174
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3175 3176 3177
      break;
    }

3178
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3179 3180 3181 3182
      break;
    }
  } while (1);

3183
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3184 3185
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3186

3187
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
3188
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
3189 3190 3191
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

3192
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
3193
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
3194 3195 3196
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3197 3198 3199 3200 3201 3202
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3203

dengyihao's avatar
dengyihao 已提交
3204 3205 3206 3207 3208 3209
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3210

H
Hongze Cheng 已提交
3211
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3212

H
refact  
Hongze Cheng 已提交
3213
// ====================================== EXPOSED APIs ======================================
3214 3215
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3216 3217
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3218 3219
    goto _err;
  }
H
Hongze Cheng 已提交
3220

3221
  // check for query time window
H
Haojun Liao 已提交
3222
  STsdbReader* pReader = *ppReader;
3223
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3224 3225 3226
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3227

3228 3229 3230
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3231
    int32_t     order = pCond->order;
3232 3233 3234 3235 3236 3237 3238 3239 3240 3241
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3242
    // here we only need one more row, so the capacity is set to be ONE.
3243 3244 3245 3246 3247 3248 3249 3250
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3251
    } else {
3252 3253 3254 3255 3256 3257 3258 3259 3260
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3261
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3262
  if (pCond->suid != 0) {
H
Haojun Liao 已提交
3263 3264 3265 3266
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
    if (pReader->pSchema == NULL) {
      tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
    }
3267 3268
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
H
Haojun Liao 已提交
3269 3270 3271 3272
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
    if (pReader->pSchema == NULL) {
      tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
    }
3273 3274
  }

3275 3276
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3277 3278 3279
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3280

H
Haojun Liao 已提交
3281 3282 3283
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3284

H
Hongze Cheng 已提交
3285
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3286 3287 3288
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3289

3290 3291
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3292

3293
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3294
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3295 3296 3297 3298 3299 3300 3301 3302 3303 3304

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3305
  } else {
3306
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3307 3308
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3309 3310 3311 3312 3313
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3314
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
3315
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
3316 3317 3318 3319 3320 3321 3322 3323 3324

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3325 3326 3327
    }
  }

3328
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3329
  return code;
H
Hongze Cheng 已提交
3330 3331

_err:
S
Shengliang Guan 已提交
3332
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3333
  return code;
H
refact  
Hongze Cheng 已提交
3334 3335 3336
}

void tsdbReaderClose(STsdbReader* pReader) {
3337 3338
  if (pReader == NULL) {
    return;
3339
  }
H
refact  
Hongze Cheng 已提交
3340

3341
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3342

3343 3344 3345 3346
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3347
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3348 3349 3350 3351
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3352

3353
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3354
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3355 3356

  cleanupDataBlockIterator(&pReader->status.blockIter);
3357 3358

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3359
  destroyBlockScanInfo(pReader->status.pTableMap);
3360
  blockDataDestroy(pReader->pResBlock);
3361

H
Haojun Liao 已提交
3362 3363 3364
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3365

3366 3367
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);

3368
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3369
  SIOCostSummary* pCost = &pReader->cost;
3370

H
Haojun Liao 已提交
3371 3372
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3373 3374
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3375

H
Haojun Liao 已提交
3376 3377 3378 3379 3380
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3381

H
Haojun Liao 已提交
3382 3383 3384 3385 3386 3387 3388 3389
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
      " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, "
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3390

3391 3392
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3393 3394 3395
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3396
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3397 3398
}

3399
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3400
  // cleanup the data that belongs to the previous data block
3401 3402
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3403

3404
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3405

3406 3407 3408 3409 3410
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3411

3412 3413 3414
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3415
      buildBlockFromBufferSequentially(pReader);
3416
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3417
    }
3418 3419 3420
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3421
  }
3422

3423
  return false;
H
refact  
Hongze Cheng 已提交
3424 3425
}

3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3463 3464 3465 3466
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3467 3468
}

3469 3470
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3471
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3472
      setBlockInfo(pReader, pDataBlockInfo);
3473
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3474 3475 3476 3477 3478 3479 3480 3481 3482
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3483
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3484
  int32_t code = 0;
3485
  *allHave = false;
H
Hongze Cheng 已提交
3486

3487
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3488 3489 3490 3491
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3492
  // there is no statistics data for composed block
3493 3494 3495 3496
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3497

3498
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3499

H
Hongze Cheng 已提交
3500 3501
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3502

3503 3504
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3505
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3506
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3507
    if (code != TSDB_CODE_SUCCESS) {
3508 3509
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3510 3511
      return code;
    }
3512 3513 3514
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3515
  }
H
Hongze Cheng 已提交
3516

3517
  *allHave = true;
H
Hongze Cheng 已提交
3518

3519 3520
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3521

3522 3523
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3540 3541
      i += 1;
      j += 1;
3542 3543 3544 3545 3546 3547 3548
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3549
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3550
  pReader->cost.smaLoadTime += elapsed;
3551
  pReader->cost.smaDataLoad += 1;
3552 3553 3554

  *pBlockStatis = pSup->plist;

3555
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3556 3557
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3558
  return code;
H
Hongze Cheng 已提交
3559 3560
}

3561
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3562 3563 3564
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3565
    return pReader->pResBlock->pDataBlock;
3566
  }
3567

3568
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3569
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3570

H
Haojun Liao 已提交
3571
  tBlockDataReset(&pStatus->fileBlockData);
3572 3573
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
3574 3575
    terrno = code;
    return NULL;
3576 3577 3578
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3579
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3580
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3581 3582
    terrno = code;
    return NULL;
3583
  }
3584 3585 3586

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3587 3588
}

3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3601
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3602 3603 3604
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3605

L
Liu Jicong 已提交
3606
  pReader->order = pCond->order;
3607
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3608
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3609
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3610
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3611

3612
  // allocate buffer in order to load data blocks from file
3613
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3614 3615
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3616
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3617
  tsdbDataFReaderClose(&pReader->pFileReader);
3618

3619
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3620

3621
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3622
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3623

3624
  int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1;
H
Haojun Liao 已提交
3625
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3626

3627
  int32_t         code = 0;
3628 3629
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3630 3631 3632 3633 3634 3635
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3636 3637
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3638 3639 3640
      return code;
    }
  }
H
Hongze Cheng 已提交
3641

dengyihao's avatar
dengyihao 已提交
3642 3643
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3644

3645
  return code;
H
Hongze Cheng 已提交
3646
}
H
Hongze Cheng 已提交
3647

3648 3649 3650
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3651

3652 3653 3654 3655
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3656

3657 3658
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3659

3660 3661 3662
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3663

3664
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3665

3666
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3667

3668 3669
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3670

3671 3672
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3673

3674 3675
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3676
  }
H
Hongze Cheng 已提交
3677

3678
  pTableBlockInfo->numOfTables = numOfTables;
3679
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3680

3681 3682
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
3683
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3684

3685 3686
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3687

3688 3689 3690
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3691

3692 3693 3694
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3695

3696 3697 3698
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3699

3700 3701
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3702 3703

      hasNext = blockIteratorNext(&pStatus->blockIter);
3704 3705 3706 3707 3708
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3709

3710 3711
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3712
    }
H
refact  
Hongze Cheng 已提交
3713

H
Hongze Cheng 已提交
3714 3715
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3716
  }
H
Hongze Cheng 已提交
3717

H
refact  
Hongze Cheng 已提交
3718 3719
  return code;
}
H
Hongze Cheng 已提交
3720

H
refact  
Hongze Cheng 已提交
3721
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3722
  int64_t rows = 0;
H
Hongze Cheng 已提交
3723

3724 3725
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3726

3727 3728 3729 3730 3731
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3732
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3733 3734 3735 3736 3737 3738 3739
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3740
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3741 3742 3743 3744 3745 3746 3747 3748
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3749

H
refact  
Hongze Cheng 已提交
3750
  return rows;
H
Hongze Cheng 已提交
3751
}
D
dapan1121 已提交
3752

L
Liu Jicong 已提交
3753
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3766

D
dapan1121 已提交
3767
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3768
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3784

D
dapan1121 已提交
3785 3786
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3817
  // fs
H
Hongze Cheng 已提交
3818 3819 3820 3821 3822
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3823 3824 3825 3826 3827 3828 3829 3830

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3831
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3846
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3847
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3848
  }
H
Hongze Cheng 已提交
3849

S
Shengliang Guan 已提交
3850
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
3851
}