tsdbRead.c 138.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38 39 40 41 42
typedef struct SBlockIndex {
  int32_t ordinalIndex;
  int64_t inFileoffset;
} SBlockIndex;

H
Haojun Liao 已提交
43
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
44 45
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
46
  SMapData  mapData;            // block info (compressed)
47
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
48 49 50 51 52 53
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
54 55 56
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
57
  int64_t uid;
58
  int64_t offset;
H
Haojun Liao 已提交
59
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
60 61

typedef struct SBlockOrderSupporter {
62 63 64 65
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
66 67 68
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
69 70 71
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
72
  int64_t headFileLoad;
73
  double  headFileLoadTime;
74
  int64_t smaDataLoad;
75
  double  smaLoadTime;
76 77
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
78 79
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Hongze Cheng 已提交
80 81 82
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
83
  SArray*          pColAgg;
84
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
85
  SColumnDataAgg** plist;
86
  int16_t*         colIds;  // column ids for loading file block data
87
  int32_t          numOfCols;
88
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
89 90
} SBlockLoadSuppInfo;

91
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
92 93 94 95 96
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
97
  SSttBlockLoadInfo* pInfo;
98 99
} SLastBlockReader;

100
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
101 102 103
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
104
  int32_t           order;
H
Hongze Cheng 已提交
105
  SLastBlockReader* pLastBlockReader;  // last file block reader
106
} SFilesetIter;
H
Haojun Liao 已提交
107 108

typedef struct SFileDataBlockInfo {
109
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
110
  uint64_t uid;
111
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
112 113 114
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
115
  int32_t   numOfBlocks;
116
  int32_t   index;
H
Hongze Cheng 已提交
117
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
118
  int32_t   order;
119
  SDataBlk  block;  // current SDataBlk data
120
  SHashObj* pTableMap;
H
Haojun Liao 已提交
121 122 123
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
124 125 126 127
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
128 129
} SFileBlockDumpInfo;

130
typedef struct SUidOrderCheckInfo {
131 132
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
133 134
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
135
typedef struct SReaderStatus {
136
  bool                 loadFromFile;       // check file stage
137
  bool                 composedDataBlock;  // the returned data block is a composed block or not
138 139
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
140
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
141
  SFileBlockDumpInfo   fBlockDumpInfo;
142 143 144 145
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
146 147
} SReaderStatus;

H
Hongze Cheng 已提交
148
struct STsdbReader {
H
Haojun Liao 已提交
149 150 151 152 153 154 155
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
156 157
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
158
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
159
  STsdbReadSnap*     pReadSnap;
160
  SIOCostSummary     cost;
161 162
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
163 164
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
165

166 167
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
168
};
H
Hongze Cheng 已提交
169

H
Haojun Liao 已提交
170
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
171 172
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
173
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
174 175
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
176
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
177
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
178
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
179
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
180
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
181
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
182
                                         int32_t rowIndex);
183
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
184
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
185

H
Hongze Cheng 已提交
186 187 188 189
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
190 191
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
192

dengyihao's avatar
dengyihao 已提交
193 194 195 196
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
197
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
198 199 200
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
201
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
202 203
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
204

205 206
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

207 208 209
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

210
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
211

212
  pSupInfo->numOfCols = numOfCols;
213
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
214
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
215 216 217
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
218 219
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
220

H
Haojun Liao 已提交
221 222
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
223
    pSupInfo->colIds[i] = pCol->info.colId;
224 225 226 227

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
228
  }
H
Hongze Cheng 已提交
229

H
Haojun Liao 已提交
230 231
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
232

233
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
234
  // allocate buffer in order to load data blocks from file
235
  // todo use simple hash instead, optimize the memory consumption
236 237 238
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
239 240 241
    return NULL;
  }

242
  for (int32_t j = 0; j < numOfTables; ++j) {
243
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
244
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
245
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
246
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
247
    } else {
H
Haojun Liao 已提交
248
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
249
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
250
    }
wmmhello's avatar
wmmhello 已提交
251

252 253 254
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
255 256
  }

257 258
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
259

260
  return pTableMap;
H
Hongze Cheng 已提交
261
}
H
Hongze Cheng 已提交
262

H
Haojun Liao 已提交
263
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
264 265
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
266
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
267 268
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
269
    if (p->iter.iter != NULL) {
270
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
271 272
    }

273
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
274
    p->lastKey = ts;
275 276 277
  }
}

278 279 280
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
281

282 283 284
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
285

286 287 288
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
289

290 291 292 293
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
294

295 296 297 298
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
299 300 301 302 303
  }

  taosHashCleanup(pTableMap);
}

304
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
305 306
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
307
}
H
Hongze Cheng 已提交
308

309 310 311
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
312
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
313

314
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
315
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
316

dengyihao's avatar
dengyihao 已提交
317
  STimeWindow win = *pWindow;
318 319 320 321 322 323
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
324

H
Haojun Liao 已提交
325
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
326 327 328 329 330 331
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
332 333 334
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
335 336 337 338
  }
}

// init file iterator
339
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
340
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
341

342 343
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
344
  pIter->pFileList = aDFileSet;
345
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
346

347 348 349 350
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
351
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
352 353
      return code;
    }
354 355
  }

356 357 358 359 360 361 362 363
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

364
  if (pLReader->pInfo == NULL) {
365
    // here we ignore the first column, which is always be the primary timestamp column
366 367
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
368 369 370 371
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
372 373
  }

374
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
375 376 377
  return TSDB_CODE_SUCCESS;
}

378
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
379 380
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
381 382 383
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
384 385 386
    return false;
  }

H
Haojun Liao 已提交
387 388 389
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

390 391
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
392
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
393

H
Haojun Liao 已提交
394 395
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
396

397
  while (1) {
H
Haojun Liao 已提交
398 399 400
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
401

402
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
403

404 405 406 407
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
408

409 410
    pReader->cost.headFileLoad += 1;

411 412 413 414 415 416 417 418 419 420 421 422
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
423 424 425
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
426 427
      continue;
    }
C
Cary Xu 已提交
428

429
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
430
              pReader->window.ekey, pReader->idStr);
431 432
    return true;
  }
433

434
_err:
H
Haojun Liao 已提交
435 436 437
  return false;
}

438
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
439 440
  pIter->order = order;
  pIter->index = -1;
441
  pIter->numOfBlocks = 0;
442 443 444 445 446 447 448
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
449
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
450

H
Haojun Liao 已提交
451
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
452 453
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
454 455
}

456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

479 480
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
481
  int32_t      code = 0;
482
  int8_t       level = 0;
H
Haojun Liao 已提交
483
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
484 485
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
486
    goto _end;
H
Hongze Cheng 已提交
487 488
  }

C
Cary Xu 已提交
489 490 491 492
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
493
  initReaderStatus(&pReader->status);
494

L
Liu Jicong 已提交
495
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
496 497
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
498
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
499 500
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
501
  pReader->type = pCond->type;
502
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
503

504
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
505

506
  limitOutputBufferSize(pCond, &pReader->capacity);
507

508 509
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
510
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
511
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
512
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
513 514 515
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
516

517 518
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
519
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
520 521 522 523 524
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

525 526 527 528
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
529
  }
H
Hongze Cheng 已提交
530

531 532
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
533 534
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
535

H
Haojun Liao 已提交
536 537
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
538 539 540
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
541

H
Haojun Liao 已提交
542
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
543
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
544

545
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
546
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
547
  if (code != TSDB_CODE_SUCCESS) {
548
    goto _end;
H
Haojun Liao 已提交
549
  }
H
Hongze Cheng 已提交
550

551 552
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
553
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
554 555
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
556

557 558 559 560
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
561
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
562

563
    // uid check
H
Hongze Cheng 已提交
564
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
565 566 567 568
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
569
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
570 571 572 573 574 575
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
576
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
577 578
    }

H
Hongze Cheng 已提交
579
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
580
  }
H
Hongze Cheng 已提交
581

582
  int64_t et2 = taosGetTimestampUs();
583
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
584
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
585 586 587

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

588
_end:
H
Hongze Cheng 已提交
589
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
590 591
  return code;
}
H
Hongze Cheng 已提交
592

593
static void cleanupTableScanInfo(SHashObj* pTableMap) {
594
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
595
  while (1) {
596
    px = taosHashIterate(pTableMap, px);
597 598 599 600
    if (px == NULL) {
      break;
    }

601
    // reset the index in last block when handing a new file
602
    tMapDataClear(&px->mapData);
603 604
    taosArrayClear(px->pBlockList);
  }
605 606
}

607
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
608 609 610 611 612 613
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
614

dengyihao's avatar
dengyihao 已提交
615
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
616
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
617

618
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
619

620
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
621
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
622

623
    sizeInDisk += pScanInfo->mapData.nData;
624
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
625 626
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
627

628
      // 1. time range check
629
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
630 631
        continue;
      }
H
Hongze Cheng 已提交
632

633
      // 2. version range check
H
Hongze Cheng 已提交
634
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
635 636
        continue;
      }
637

H
Haojun Liao 已提交
638 639
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileoffset = block.aSubBlock->offset};
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
640
      if (p == NULL) {
641
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
642 643
        return TSDB_CODE_OUT_OF_MEMORY;
      }
644

645
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
646
    }
H
Hongze Cheng 已提交
647

H
Haojun Liao 已提交
648
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
649 650 651 652
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
653
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
654
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
655

656
  double el = (taosGetTimestampUs() - st) / 1000.0;
657 658 659 660 661
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
662

663
  pReader->cost.numOfBlocks += total;
664
  pReader->cost.headFileLoadTime += el;
665

H
Haojun Liao 已提交
666 667
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
668

669
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
670
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
671
  pDumpInfo->allDumped = true;
672
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
673 674
}

675 676
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
677
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
678
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
679 680 681
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
682
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
683 684 685 686
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
687
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
688
  }
H
Haojun Liao 已提交
689 690
}

691
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
692 693
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
694 695
    return NULL;
  }
696 697 698

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
699 700
}

H
Hongze Cheng 已提交
701
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
702

H
Haojun Liao 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
776
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
777 778 779 780
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
781 782
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
783 784
    while (1) {
      // check can return
H
Hongze Cheng 已提交
785 786 787
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
788 789

      // change start or end position
H
Hongze Cheng 已提交
790
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
791 792
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
793
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
794 795 796 797
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
798
  } else {  // DESC
H
Haojun Liao 已提交
799
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
800 801
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
802 803
    while (1) {
      // check can return
H
Hongze Cheng 已提交
804 805 806
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
807 808

      // change start or end position
H
Hongze Cheng 已提交
809
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
810 811
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
812
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
813 814 815 816 817 818 819 820 821 822
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
823
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
824 825 826 827 828 829 830 831 832 833 834 835

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order);
  }

  return endPos;
}

836
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
837
  SReaderStatus*  pStatus = &pReader->status;
838
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
839

840
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
841
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
842
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
843
  SSDataBlock*        pResBlock = pReader->pResBlock;
844
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
845

H
Haojun Liao 已提交
846
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
847
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
848

H
Haojun Liao 已提交
849
  SColVal cv = {0};
850
  int64_t st = taosGetTimestampUs();
851 852
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
853

854 855
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
856 857 858
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
859 860 861 862 863
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
      int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
    }
H
Haojun Liao 已提交
864 865 866
  }

  // time window check
867 868 869 870 871 872 873
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
874
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
875
  if (remain > pReader->capacity) {  // output buffer check
876 877 878
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
879 880
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
881
  int32_t          i = 0;
882 883
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
884
    if (asc) {
H
Haojun Liao 已提交
885
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
886 887 888 889
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
890
    }
H
Haojun Liao 已提交
891

892 893 894
    i += 1;
  }

895 896 897
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
898 899 900
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
901
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
902 903 904
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
905
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
906 907 908 909
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
910
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
911 912 913 914

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
915
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
916 917 918 919 920 921 922 923 924 925 926
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
927
      }
H
Haojun Liao 已提交
928

929
      colIndex += 1;
930
      i += 1;
931 932
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
933
      i += 1;
H
Haojun Liao 已提交
934
    }
935 936
  }

937
  // fill the mis-matched columns with null value
938
  while (i < numOfOutputCols) {
939 940 941
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
942
  }
H
Haojun Liao 已提交
943

944
  pResBlock->info.rows = remain;
945
  pDumpInfo->rowIndex += step * remain;
946

947
  // check if current block are all handled
948
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
949 950 951 952
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
953
  } else {
954 955
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
956
  }
H
Haojun Liao 已提交
957

958
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
959
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
960

961
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
962
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
963
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
964 965
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
966 967 968 969

  return TSDB_CODE_SUCCESS;
}

970 971
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
972 973
  int64_t st = taosGetTimestampUs();

974 975
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
976 977
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
978 979 980 981
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

982
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
983
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
984
  ASSERT(pBlockInfo != NULL);
985

H
Hongze Cheng 已提交
986
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
987
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
988 989
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
990
                  ", rows:%d, code:%s %s",
991
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
992 993 994
              tstrerror(code), pReader->idStr);
    return code;
  }
995

996
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
997

998
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
999
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1000 1001
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1002 1003 1004

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1005

H
Haojun Liao 已提交
1006
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1007
}
H
Hongze Cheng 已提交
1008

H
Haojun Liao 已提交
1009 1010 1011
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1012

H
Haojun Liao 已提交
1013 1014 1015 1016
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1017

H
Haojun Liao 已提交
1018 1019
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1020

H
Haojun Liao 已提交
1021 1022
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1023

H
Haojun Liao 已提交
1024
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1025 1026
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1027

H
Haojun Liao 已提交
1028 1029 1030 1031
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1032

H
Haojun Liao 已提交
1033 1034
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1035

H
Haojun Liao 已提交
1036
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1037
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1038
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1039

H
Haojun Liao 已提交
1040
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1041

H
Haojun Liao 已提交
1042 1043
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1044

H
Haojun Liao 已提交
1045 1046 1047 1048 1049 1050 1051
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1052

1053
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1054
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1055

1056 1057 1058
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1059
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1060 1061 1062
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1063
    if (pScanInfo == NULL) {
1064
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1065 1066 1067
      return TSDB_CODE_INVALID_PARA;
    }

1068 1069
    SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1070
  }
1071 1072 1073 1074 1075 1076

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1077
}
H
Hongze Cheng 已提交
1078

1079
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1080
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1081

1082
  SBlockOrderSupporter sup = {0};
1083
  pBlockIter->numOfBlocks = numOfBlocks;
1084
  taosArrayClear(pBlockIter->blockList);
1085
  pBlockIter->pTableMap = pReader->status.pTableMap;
1086

1087 1088
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1089

1090
  int64_t st = taosGetTimestampUs();
1091
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1092 1093 1094
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1095

1096 1097 1098 1099 1100 1101 1102
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1103

1104 1105 1106 1107
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1108

1109 1110
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1111

1112 1113 1114 1115 1116
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1117

1118
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1119

1120 1121 1122 1123
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileoffset};
1124 1125 1126 1127 1128
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1129

1130
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1131

1132
  // since there is only one table qualified, blocks are not sorted
1133 1134
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1135 1136
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1137
    }
1138

1139
    int64_t et = taosGetTimestampUs();
1140
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1141
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1142

1143
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1144
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1145
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1146
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1147
  }
H
Haojun Liao 已提交
1148

1149 1150
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1151

1152
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1153

1154 1155 1156 1157 1158
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1159
  }
H
Haojun Liao 已提交
1160

1161 1162 1163 1164
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1165

1166 1167
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1168

1169 1170 1171 1172
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1173

1174 1175
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1176
  }
H
Haojun Liao 已提交
1177

1178
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1179 1180
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1181 1182
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1183

1184
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1185
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1186

1187
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1188
}
H
Hongze Cheng 已提交
1189

H
Haojun Liao 已提交
1190
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1191 1192
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1193
  int32_t step = asc ? 1 : -1;
1194
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1195 1196 1197
    return false;
  }

1198
  pBlockIter->index += step;
H
Haojun Liao 已提交
1199
  doSetCurrentBlock(pBlockIter, idStr);
1200

1201 1202 1203
  return true;
}

1204 1205 1206
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1207
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1208 1209
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1210 1211
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1212
}
H
Hongze Cheng 已提交
1213

1214 1215
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                        int32_t* nextIndex, int32_t order, SDataBlk* pBlock) {
1216
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1217
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1218
    return false;
1219 1220
  }

H
Haojun Liao 已提交
1221
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1222
    return false;
1223 1224
  }

1225
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1226
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1227

1228 1229
  SBlockIndex* pIndex = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1230
  return true;
1231 1232 1233 1234 1235
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1236
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1237 1238
  int32_t index = pBlockIter->index;

1239
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1252
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1253
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1254 1255 1256 1257
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1258 1259 1260 1261 1262
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1263

1264 1265 1266
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1267

H
Haojun Liao 已提交
1268
  doSetCurrentBlock(pBlockIter, "");
1269 1270 1271
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1272
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1273 1274 1275 1276 1277 1278
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1279
}
H
Hongze Cheng 已提交
1280

H
Hongze Cheng 已提交
1281
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1282
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1283

1284
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1285
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1286
}
H
Hongze Cheng 已提交
1287

H
Hongze Cheng 已提交
1288
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1289 1290
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1291 1292
}

H
Hongze Cheng 已提交
1293
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1294 1295 1296 1297 1298
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1299
      if (p->version >= pBlock->minVer) {
1300 1301 1302
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1303
      if (p->version >= pBlock->minVer) {
1304 1305
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1306 1307
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1321
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1322 1323 1324 1325
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1326
  // ts is not overlap
1327
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1328
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1329 1330 1331 1332 1333
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1334 1335 1336 1337
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1338
    while (1) {
1339 1340 1341 1342 1343
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1344 1345 1346
      }
    }

1347 1348
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1349 1350
}

H
Haojun Liao 已提交
1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1364 1365 1366 1367
  int32_t  neighborIndex = 0;
  SDataBlk block = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &block);
1368

1369
  // overlap with neighbor
1370 1371
  if (hasNeighbor) {
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &block, pReader->order);
1372 1373
  }

1374
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1375 1376
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1377

H
Haojun Liao 已提交
1378 1379 1380
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1381 1382
  }

H
Haojun Liao 已提交
1383 1384 1385 1386
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1387

H
Haojun Liao 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1402 1403 1404 1405

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1406
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1407
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1408 1409 1410
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1411 1412 1413
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1414 1415
}

H
Haojun Liao 已提交
1416 1417 1418 1419 1420 1421 1422 1423 1424
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1425
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1426
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1427 1428
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1429

1430 1431 1432
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1433
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1434

1435
  blockDataUpdateTsWindow(pBlock, 0);
1436
  pBlock->info.uid = pBlockScanInfo->uid;
1437

1438
  setComposedBlockFlag(pReader, true);
1439

1440
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1441
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1442
                " - %" PRId64 " %s",
1443 1444
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1445 1446

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1447 1448 1449
  return code;
}

1450 1451
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1452 1453 1454 1455 1456
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1457
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1458 1459

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1460
    if (nextKey != key) {  // merge is not needed
1461
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1462 1463 1464 1465 1466 1467 1468 1469
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1470 1471
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1472 1473 1474 1475 1476 1477 1478 1479
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1480 1481
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1482 1483 1484 1485 1486 1487 1488
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1489
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1504 1505 1506
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1507
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1508 1509
  }

1510
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1511 1512 1513 1514 1515 1516
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1517 1518 1519 1520 1521 1522
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1523 1524 1525 1526 1527 1528
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1529
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1530
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1531
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1532 1533
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1534 1535
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1536
  }
H
Haojun Liao 已提交
1537 1538
}

1539
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1540 1541 1542 1543 1544 1545
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1546
  int64_t tsLast = INT64_MIN;
1547
  if (hasDataInLastBlock(pLastBlockReader)) {
1548 1549
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1550

H
Hongze Cheng 已提交
1551 1552
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1553

1554 1555
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1556
    minKey = INT64_MAX;  // chosen the minimum value
1557
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1558 1559
      minKey = tsLast;
    }
1560

1561 1562 1563
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1564

1565
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1566 1567 1568 1569
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1570
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1571 1572 1573 1574 1575 1576 1577
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1578
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1579 1580
      minKey = key;
    }
1581 1582 1583 1584
  }

  bool init = false;

1585
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1586
  // DESC: mem -----> imem -----> last block -----> file block
1587 1588
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1589
      init = true;
H
Haojun Liao 已提交
1590 1591 1592 1593
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1594
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1595 1596
    }

1597
    if (minKey == tsLast) {
1598
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1599 1600 1601
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1602
        init = true;
H
Haojun Liao 已提交
1603 1604 1605 1606
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1607
      }
1608
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1609
    }
1610

1611
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1612 1613 1614
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1615 1616
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1617 1618 1619 1620 1621 1622 1623 1624
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1625 1626 1627 1628 1629
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1630
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1631 1632 1633 1634 1635 1636
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1637
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1638 1639
        return code;
      }
1640 1641
    }

1642
    if (minKey == tsLast) {
1643
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1644 1645 1646
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1647
        init = true;
H
Haojun Liao 已提交
1648 1649 1650 1651
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1652
      }
1653
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1654 1655 1656
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1657 1658 1659
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1660
        init = true;
H
Haojun Liao 已提交
1661 1662 1663 1664
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1665 1666 1667
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1668 1669
  }

1670 1671 1672 1673 1674
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1675 1676 1677 1678 1679 1680 1681
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1682 1683 1684
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1685
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1686
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1687 1688 1689

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1690
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1691
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1692

1693 1694 1695 1696 1697
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1698 1699 1700 1701
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1702 1703 1704

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1705
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1706

H
Haojun Liao 已提交
1707
      code = tRowMergerGetRow(&merge, &pTSRow);
1708 1709 1710
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1711

1712 1713 1714 1715 1716 1717
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1718 1719 1720 1721 1722
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1723
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1724
    ASSERT(mergeBlockData);
1725 1726

    // merge with block data if ts == key
H
Haojun Liao 已提交
1727
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1728 1729 1730
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1731
    code = tRowMergerGetRow(&merge, &pTSRow);
1732 1733 1734 1735 1736 1737 1738 1739 1740
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1741 1742 1743 1744

  return TSDB_CODE_SUCCESS;
}

1745 1746
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1747 1748
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1749
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1750
    // no last block available, only data block exists
1751
    if (!hasDataInLastBlock(pLastBlockReader)) {
1752 1753 1754 1755 1756 1757 1758 1759 1760
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1761
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1762 1763 1764 1765
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1766

H
Haojun Liao 已提交
1767 1768 1769 1770 1771
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1772
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1773 1774 1775 1776

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1777
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1778

H
Haojun Liao 已提交
1779
        code = tRowMergerGetRow(&merge, &pTSRow);
1780 1781 1782 1783
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1784
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1785

1786 1787
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1788
        return code;
1789
      } else {
1790 1791
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1792
      }
1793
    } else {  // desc order
1794
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1795
    }
1796
  } else {  // only last block exists
1797
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1798
  }
1799 1800
}

1801 1802
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1803 1804 1805
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1806 1807 1808
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1809 1810
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1811 1812
  ASSERT(pRow != NULL && piRow != NULL);

1813
  int64_t tsLast = INT64_MIN;
1814 1815 1816
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1817

H
Hongze Cheng 已提交
1818
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1819 1820 1821 1822

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1823
  int64_t minKey = 0;
1824 1825 1826 1827 1828
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1829

1830 1831 1832
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1833

1834
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1835 1836
      minKey = key;
    }
1837

1838
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1839 1840 1841
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1842
    minKey = INT64_MIN;  // let find the maximum ts value
1843 1844 1845 1846 1847 1848 1849 1850
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1851
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1852 1853 1854
      minKey = key;
    }

1855
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1856 1857
      minKey = tsLast;
    }
1858 1859 1860 1861
  }

  bool init = false;

1862 1863 1864 1865
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1866
      init = true;
1867
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1868
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1869 1870 1871 1872
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1873
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1874 1875
    }

1876
    if (minKey == tsLast) {
1877
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1878 1879 1880
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1881
        init = true;
1882
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1883 1884 1885
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1886
      }
H
Haojun Liao 已提交
1887

1888
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1889 1890 1891
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1892 1893 1894
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1895 1896
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1897 1898 1899 1900
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1901
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1902 1903 1904
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1905
      }
H
Haojun Liao 已提交
1906

1907 1908
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1909 1910 1911
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1912 1913
    }

1914
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1915
      if (init) {
1916 1917 1918 1919
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1920 1921
        tRowMerge(&merge, pRow);
      } else {
1922
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1923
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1924 1925 1926
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1927
      }
H
Haojun Liao 已提交
1928 1929 1930 1931 1932
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1933 1934 1935 1936
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1937
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1938
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1939 1940 1941 1942
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
1943 1944 1945 1946 1947
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1948 1949 1950
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1951 1952 1953
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1954 1955
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1956
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1957 1958 1959
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1960
      }
H
Haojun Liao 已提交
1961 1962 1963 1964 1965
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1966 1967 1968
    }

    if (minKey == tsLast) {
1969
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1970 1971 1972
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1973
        init = true;
1974
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1975 1976 1977
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1978
      }
1979
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1980 1981 1982
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1983
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1984
      if (!init) {
1985
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1986 1987 1988
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
1989
      } else {
1990 1991 1992
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
1993
        tRowMerge(&merge, &fRow);
1994 1995
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1996 1997 1998
    }
  }

1999
  if (merge.pTSchema == NULL) {
2000 2001 2002
    return code;
  }

H
Haojun Liao 已提交
2003
  code = tRowMergerGetRow(&merge, &pTSRow);
2004 2005 2006 2007
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2008 2009 2010 2011
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2012
  return code;
2013 2014
}

2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2040
                  "-%" PRId64 " %s",
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2061
                  "-%" PRId64 " %s",
2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2079 2080
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2081 2082 2083 2084 2085 2086 2087 2088
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2100
  TSDBKEY k = {.ts = ts, .version = ver};
2101 2102
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2103 2104 2105
    return false;
  }

2106 2107 2108
  return true;
}

2109
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2110
  // the last block reader has been initialized for this table.
2111
  if (pLBlockReader->uid == pScanInfo->uid) {
2112 2113 2114
    return true;
  }

2115 2116
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2117 2118
  }

2119 2120
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2121

H
Hongze Cheng 已提交
2122
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2123 2124 2125
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2126
  } else {
2127
    w.ekey = pScanInfo->lastKey + step;
2128 2129
  }

2130 2131 2132
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2133 2134 2135 2136
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2137
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2138 2139
}

2140
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2141
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2142
  return TSDBROW_TS(&row);
2143 2144
}

H
Hongze Cheng 已提交
2145
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2146 2147 2148 2149

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2150 2151
  }

2152
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2153
}
2154

2155 2156
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2157 2158 2159 2160
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2161 2162
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2163 2164 2165
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2166 2167 2168 2169 2170
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2171
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2172
    code = tRowMergerGetRow(&merge, &pTSRow);
2173 2174 2175 2176
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2177 2178 2179 2180 2181 2182 2183 2184
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2185 2186
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2187 2188
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2189
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2190
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2191
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2192
  } else {
2193 2194 2195 2196 2197 2198 2199 2200 2201
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2202
    // imem + file + last block
2203
    if (pBlockScanInfo->iiter.hasVal) {
2204
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2205 2206
    }

2207
    // mem + file + last block
2208
    if (pBlockScanInfo->iter.hasVal) {
2209
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2210
    }
2211

2212 2213
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2214 2215 2216
  }
}

2217
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2218 2219
  int32_t code = TSDB_CODE_SUCCESS;

2220 2221
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2222
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2223 2224 2225
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2226 2227 2228 2229

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2230 2231
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2232 2233
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2234 2235 2236
      goto _end;
    }

H
Haojun Liao 已提交
2237
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2238
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2239 2240 2241

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2242 2243
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2244 2245 2246
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2247 2248
    }
  } else {  // file blocks not exist
2249 2250 2251
    pBlockScanInfo = pReader->status.pTableIter;
  }

2252
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2253 2254
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2255

2256
  while (1) {
2257
    bool hasBlockData = false;
2258
    {
H
Haojun Liao 已提交
2259
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2260 2261 2262 2263 2264
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2265 2266
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2267
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2268
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2269
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2270 2271 2272
          break;
        }
      }
2273
    }
2274

2275
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2276

2277 2278 2279
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2280 2281
    }

2282
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2283

2284
    // currently loaded file data block is consumed
2285
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2286
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2287
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2288 2289 2290 2291 2292
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2293 2294 2295
    }
  }

H
Hongze Cheng 已提交
2296
_end:
2297
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2298 2299
  blockDataUpdateTsWindow(pResBlock, 0);

2300
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2301
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2302 2303 2304

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2305

2306 2307
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2308
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2309
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2310
              pResBlock->info.rows, el, pReader->idStr);
2311
  }
2312

H
Haojun Liao 已提交
2313
  return code;
2314 2315 2316 2317
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2318 2319
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2320 2321 2322
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2323

2324 2325 2326
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2327 2328
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2329
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2330 2331
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2332
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2333
    if (code != TSDB_CODE_SUCCESS) {
2334 2335 2336 2337 2338
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2339
      tsdbDelFReaderClose(&pDelFReader);
2340 2341 2342
      goto _err;
    }

H
Hongze Cheng 已提交
2343
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2344 2345 2346
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2347 2348
      goto _err;
    }
2349

2350 2351 2352
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2353
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2354
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2355 2356 2357 2358 2359 2360 2361
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2362
    }
2363
  }
2364

2365 2366 2367 2368 2369 2370 2371
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2372 2373
  }

2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2388 2389
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2390 2391
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2392
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2393 2394
  return code;

2395 2396 2397
_err:
  taosArrayDestroy(pDelData);
  return code;
2398 2399
}

H
Haojun Liao 已提交
2400
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2401
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2402
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2403
  if (pRow != NULL) {
2404 2405 2406
    key = TSDBROW_KEY(pRow);
  }

2407
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2408
  if (pRow != NULL) {
2409 2410 2411 2412 2413 2414 2415 2416 2417
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2418
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2419
  SReaderStatus* pStatus = &pReader->status;
2420
  pBlockNum->numOfBlocks = 0;
2421
  pBlockNum->numOfLastFiles = 0;
2422

2423
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2424
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2425 2426

  while (1) {
2427
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2428
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2429 2430 2431
      break;
    }

H
Haojun Liao 已提交
2432
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2433 2434
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2435
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2436 2437 2438
      return code;
    }

H
Hongze Cheng 已提交
2439
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2440
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2441
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2442
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2443 2444 2445
        return code;
      }

2446
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2447 2448 2449
        break;
      }
    }
2450

H
Haojun Liao 已提交
2451 2452 2453
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2454
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2455 2456 2457
  return TSDB_CODE_SUCCESS;
}

2458
static int32_t uidComparFunc(const void* p1, const void* p2) {
2459 2460
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2461 2462 2463
  if (pu1 == pu2) {
    return 0;
  } else {
2464
    return (pu1 < pu2) ? -1 : 1;
2465 2466
  }
}
2467

2468
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2469 2470 2471 2472
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2473
  while (p != NULL) {
2474 2475 2476 2477 2478 2479 2480 2481
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2482
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2483 2484 2485 2486
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2487

2488
  if (pOrderCheckInfo->tableUidList == NULL) {
2489 2490 2491 2492 2493 2494
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2495
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2496 2497 2498
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2499 2500
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2501 2502
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2503 2504

      // the tableMap has already updated
2505
      if (pStatus->pTableIter == NULL) {
2506
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2507 2508 2509 2510 2511 2512 2513 2514 2515
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2516
      }
2517
    }
2518
  }
2519

2520 2521 2522
  return TSDB_CODE_SUCCESS;
}

2523
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2536
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2537
  SReaderStatus*    pStatus = &pReader->status;
2538 2539
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2540 2541
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2542
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2543 2544
    return code;
  }
2545

2546
  while (1) {
2547 2548
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2549
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2550
    if (!hasVal) {
2551 2552
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2553 2554 2555
        return TSDB_CODE_SUCCESS;
      }
      continue;
2556 2557
    }

2558 2559 2560 2561 2562 2563 2564 2565
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2566

2567
    // current table is exhausted, let's try next table
2568 2569
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2570 2571
      return TSDB_CODE_SUCCESS;
    }
2572 2573 2574
  }
}

2575
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2576 2577
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2578 2579 2580

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2581 2582 2583
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2584

2585
  if (pBlockInfo != NULL) {
2586 2587 2588 2589 2590
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2591
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2592
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2593 2594 2595 2596
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2597
  if (pBlockInfo != NULL) {
2598
    pBlock = getCurrentBlock(pBlockIter);
2599 2600
  }

2601
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2602
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2603

2604 2605 2606
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2607
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2608
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2609 2610
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2611 2612 2613
    }

    // build composed data block
2614
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2615
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2616
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2617
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2618
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2619
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2620 2621 2622 2623
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2624
      ASSERT(tsLast >= pBlock->maxKey.ts);
2625 2626
      tBlockDataReset(&pReader->status.fileBlockData);

2627
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2628
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2629
    } else {  // whole block is required, return it directly
2630 2631 2632 2633 2634 2635 2636
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2637 2638 2639 2640 2641
  }

  return code;
}

H
Haojun Liao 已提交
2642
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2643 2644
  SReaderStatus* pStatus = &pReader->status;

2645
  while (1) {
2646 2647 2648
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2649
        return TSDB_CODE_SUCCESS;
2650 2651 2652 2653
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2654
    initMemDataIterator(pBlockScanInfo, pReader);
2655

2656
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2657
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2658 2659 2660 2661
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2662
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2663
      return TSDB_CODE_SUCCESS;
2664 2665 2666 2667 2668
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2669
      return TSDB_CODE_SUCCESS;
2670 2671 2672 2673
    }
  }
}

2674
// set the correct start position in case of the first/last file block, according to the query time window
2675
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2676
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2677

2678 2679 2680
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2681 2682 2683

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2684
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2685 2686
}

2687
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2688 2689
  SBlockNumber num = {0};

2690
  int32_t code = moveToNextFile(pReader, &num);
2691 2692 2693 2694 2695
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2696
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2697 2698 2699 2700 2701
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2702 2703
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2704
  } else {  // no block data, only last block exists
2705
    tBlockDataReset(&pReader->status.fileBlockData);
2706
    resetDataBlockIterator(pBlockIter, pReader->order);
2707
  }
2708 2709

  // set the correct start position according to the query time window
2710
  initBlockDumpInfo(pReader, pBlockIter);
2711 2712 2713
  return code;
}

2714
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2715 2716
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2717 2718
}

2719
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2720
  int32_t code = TSDB_CODE_SUCCESS;
2721 2722
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2723 2724
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2725
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2726
  _begin:
2727 2728 2729 2730 2731
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2732 2733 2734 2735
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2736
    // all data blocks are checked in this last block file, now let's try the next file
2737 2738 2739 2740 2741 2742 2743 2744
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2745
      // this file does not have data files, let's start check the last block file if exists
2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2761
  while (1) {
2762 2763
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2764
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2765
      code = buildComposedDataBlock(pReader);
2766 2767 2768 2769
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2770
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2771 2772
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2773
        } else {
H
Haojun Liao 已提交
2774
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2775 2776 2777 2778 2779 2780
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2781

2782 2783 2784 2785
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2786

2787 2788 2789 2790
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2791
          }
2792
        }
H
Haojun Liao 已提交
2793
      }
2794 2795

      code = doBuildDataBlock(pReader);
2796 2797
    }

2798 2799 2800 2801 2802 2803 2804 2805
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2806
}
H
refact  
Hongze Cheng 已提交
2807

2808 2809
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2810
  if (VND_IS_RSMA(pVnode)) {
2811
    int8_t  level = 0;
2812 2813
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2814
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2815 2816
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2817

2818
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2819 2820 2821 2822 2823 2824 2825
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2826
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2827 2828 2829 2830 2831
        break;
      }
      ++level;
    }

2832
    const char* str = (idStr != NULL) ? idStr : "";
2833 2834

    if (level == TSDB_RETENTION_L0) {
2835
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2836
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2837 2838
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2839
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2840
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2841 2842
      return VND_RSMA1(pVnode);
    } else {
2843
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2844
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2845 2846 2847 2848 2849 2850 2851
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2852
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2853
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2854 2855

  int64_t endVer = 0;
L
Liu Jicong 已提交
2856 2857
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2858 2859
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2860
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2861 2862
  }

H
Haojun Liao 已提交
2863
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2864 2865
}

2866
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2867 2868 2869 2870
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2871 2872 2873
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2874

2875 2876 2877 2878 2879 2880
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2881
        return false;
2882 2883 2884
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2885 2886
      }
    } else {
2887 2888 2889 2890 2891 2892 2893
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2894 2895
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2911 2912
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2913 2914 2915 2916 2917 2918
            return true;
          }
        }
      }

      return false;
2919 2920
    }
  } else {
2921 2922
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2923

2924 2925 2926 2927 2928 2929 2930
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2931
    } else {
2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2959 2960 2961 2962 2963
      }

      return false;
    }
  }
2964 2965

  return false;
2966 2967
}

2968
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2969
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2970 2971
    return NULL;
  }
H
Hongze Cheng 已提交
2972

2973
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2974
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2975
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2976
    pIter->hasVal = false;
H
Haojun Liao 已提交
2977 2978
    return NULL;
  }
H
Hongze Cheng 已提交
2979

2980
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2981
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2982
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
2983 2984
    return pRow;
  }
H
Hongze Cheng 已提交
2985

2986
  while (1) {
2987 2988
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2989 2990
      return NULL;
    }
H
Hongze Cheng 已提交
2991

2992
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2993

H
Haojun Liao 已提交
2994
    key = TSDBROW_KEY(pRow);
2995
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2996
      pIter->hasVal = false;
H
Haojun Liao 已提交
2997 2998
      return NULL;
    }
H
Hongze Cheng 已提交
2999

dengyihao's avatar
dengyihao 已提交
3000
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3001
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3002 3003 3004 3005
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3006

3007 3008
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3009
  while (1) {
3010 3011
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3012 3013
      break;
    }
H
Hongze Cheng 已提交
3014

3015
    // data exists but not valid
3016
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3017 3018 3019 3020 3021
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3022
    TSDBKEY k = TSDBROW_KEY(pRow);
3023
    if (k.ts != ts) {
H
Haojun Liao 已提交
3024 3025 3026
      break;
    }

H
Haojun Liao 已提交
3027
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3028 3029 3030 3031
    if (pTSchema == NULL) {
      return terrno;
    }

3032
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3033 3034 3035 3036 3037
  }

  return TSDB_CODE_SUCCESS;
}

3038
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3039
                                          SVersionRange* pVerRange, int32_t step) {
3040 3041
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3042
      rowIndex += step;
3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3059
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3060 3061
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3062
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3063
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3064

3065
  *state = CHECK_FILEBLOCK_QUIT;
3066
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3067

3068 3069 3070 3071
  int32_t  nextIndex = -1;
  SDataBlk block = {0};
  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &block);
  if (!hasNeighbor) {  // do nothing
3072 3073 3074
    return 0;
  }

3075
  bool overlap = overlapWithNeighborBlock(pBlock, &block, pReader->order);
3076
  if (overlap) {  // load next block
3077
    SReaderStatus*  pStatus = &pReader->status;
3078 3079
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3080
    // 1. find the next neighbor block in the scan block list
3081
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3082
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3083

3084
    // 2. remove it from the scan block list
3085
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3086

3087
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3088
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3089 3090 3091 3092
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3093
    // 4. check the data values
3094 3095 3096 3097
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3098
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3099 3100 3101 3102 3103 3104 3105
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3106 3107
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3108 3109
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3110
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3111
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3112
  int32_t step = asc ? 1 : -1;
3113

3114
  pDumpInfo->rowIndex += step;
3115
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3116 3117 3118
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3119

3120 3121 3122 3123
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3124

3125
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3126
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3127 3128 3129
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3130
      }
3131
    }
H
Haojun Liao 已提交
3132
  }
3133

H
Haojun Liao 已提交
3134 3135 3136
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3137
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3138
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3139
  pScanInfo->lastKey = ts;
3140
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3141 3142
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3143
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3144 3145 3146 3147 3148 3149 3150 3151 3152
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3153 3154
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3155
  TSDBROW* pNextRow = NULL;
3156
  TSDBROW  current = *pRow;
3157

3158 3159
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3160

3161 3162 3163
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3164
      return TSDB_CODE_SUCCESS;
3165
    } else {  // has next point in mem/imem
3166
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3167 3168 3169
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3170
        return TSDB_CODE_SUCCESS;
3171 3172
      }

H
Haojun Liao 已提交
3173
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3174 3175
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3176
        return TSDB_CODE_SUCCESS;
3177
      }
3178
    }
3179 3180
  }

3181 3182
  SRowMerger merge = {0};

3183
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3184
  terrno = 0;
H
Haojun Liao 已提交
3185
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3186 3187 3188
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3189

3190 3191
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3192
  }
H
Haojun Liao 已提交
3193

H
Haojun Liao 已提交
3194 3195 3196 3197
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3198 3199

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3200
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3201 3202 3203
    return terrno;
  }

H
Haojun Liao 已提交
3204 3205
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3206 3207 3208 3209 3210
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3211
  code = tRowMergerGetRow(&merge, pTSRow);
3212 3213 3214
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3215

3216
  tRowMergerClear(&merge);
3217
  *freeTSRow = true;
3218
  return TSDB_CODE_SUCCESS;
3219 3220
}

3221
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3222
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3223 3224
  SRowMerger merge = {0};

3225 3226 3227
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3228
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3229
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3230

H
Haojun Liao 已提交
3231 3232 3233 3234 3235 3236 3237 3238 3239 3240
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3241

3242
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3243 3244 3245 3246 3247 3248
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3249
  } else {
H
Haojun Liao 已提交
3250
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3251

H
Haojun Liao 已提交
3252
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3253
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3254 3255 3256 3257 3258 3259 3260 3261
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3262 3263

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3264 3265 3266 3267 3268
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3269
  }
3270

3271 3272
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3273 3274
}

3275 3276
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3277 3278
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3279
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3280
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3281

3282 3283
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3284
  if (pBlockScanInfo->iter.hasVal) {
3285 3286 3287 3288 3289 3290
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3291
  if (pBlockScanInfo->iiter.hasVal) {
3292 3293 3294 3295 3296 3297
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3298
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3299
    TSDBKEY k = TSDBROW_KEY(pRow);
3300
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3301

3302
    int32_t code = TSDB_CODE_SUCCESS;
3303 3304
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3305
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3306
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3307
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3308
      }
3309
    } else {  // ik.ts == k.ts
3310
      *freeTSRow = true;
3311 3312 3313 3314
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3315
    }
3316

3317
    return code;
H
Haojun Liao 已提交
3318 3319
  }

3320
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3321 3322
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3323 3324
  }

3325
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3326
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3327 3328 3329 3330 3331
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3332
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3333 3334 3335
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3336
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3337
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3338

3339
  SColVal colVal = {0};
3340
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3341

3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3353
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3354 3355 3356 3357 3358 3359 3360 3361
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3362
    }
3363 3364
  }

3365
  // set null value since current column does not exist in the "pSchema"
3366
  while (i < numOfCols) {
3367 3368 3369 3370 3371
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3372 3373 3374 3375
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3376 3377
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3378 3379 3380 3381 3382 3383 3384 3385
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3386
    i += 1;
3387 3388 3389
  }

  SColVal cv = {0};
3390 3391
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3392

3393
  while (i < numOfOutputCols && j < numOfInputCols) {
3394
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3395
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3396

3397 3398 3399 3400 3401
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3402
    if (pData->cid == pCol->info.colId) {
3403 3404
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3405
      j += 1;
H
Haojun Liao 已提交
3406 3407
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3408 3409 3410 3411 3412 3413 3414 3415
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3416
    colDataAppendNULL(pCol, outputRowIndex);
3417 3418 3419 3420 3421 3422 3423
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3424 3425
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3426 3427 3428 3429
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3430
    bool    freeTSRow = false;
3431
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3432 3433
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3434 3435
    }

H
Haojun Liao 已提交
3436
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3437 3438 3439
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3440 3441

    // no data in buffer, return immediately
3442
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3443 3444 3445
      break;
    }

3446
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3447 3448 3449 3450
      break;
    }
  } while (1);

3451
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3452 3453
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3454

3455 3456
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3457
  ASSERT(pReader != NULL);
3458 3459 3460 3461 3462 3463

  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
  }

3464 3465
  taosHashClear(pReader->status.pTableMap);

3466 3467 3468 3469 3470 3471
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
    taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
  }

H
Hongze Cheng 已提交
3472 3473 3474
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3475 3476 3477 3478 3479 3480
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3481

dengyihao's avatar
dengyihao 已提交
3482 3483 3484 3485 3486 3487
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3488

H
Hongze Cheng 已提交
3489
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3490

3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3506
// ====================================== EXPOSED APIs ======================================
3507 3508
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3509 3510 3511 3512 3513 3514
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3515 3516
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3517 3518
    goto _err;
  }
H
Hongze Cheng 已提交
3519

3520
  // check for query time window
H
Haojun Liao 已提交
3521
  STsdbReader* pReader = *ppReader;
3522
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3523 3524 3525
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3526

3527 3528
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3529
    int32_t order = pCond->order;
3530
    if (order == TSDB_ORDER_ASC) {
3531
      pCond->twindows.ekey = window.skey;
3532 3533 3534
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3535
      pCond->twindows.skey = window.ekey;
3536 3537 3538 3539
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3540
    // here we only need one more row, so the capacity is set to be ONE.
3541 3542 3543 3544 3545 3546
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3547
      pCond->twindows.skey = window.ekey;
3548
      pCond->twindows.ekey = INT64_MAX;
3549
    } else {
3550
      pCond->twindows.skey = INT64_MIN;
3551
      pCond->twindows.ekey = window.ekey;
3552
    }
3553 3554
    pCond->order = order;

3555 3556 3557 3558 3559 3560
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3561
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3562
  if (pCond->suid != 0) {
3563
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3564
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3565
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3566
    }
3567 3568
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3569
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3570
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3571
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3572
    }
3573 3574
  }

H
Hongze Cheng 已提交
3575
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3576

3577
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3578 3579 3580
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3581

H
Haojun Liao 已提交
3582 3583 3584
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3585

3586
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3587
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3588 3589 3590
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3591

3592
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3593 3594 3595
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3596
      }
3597
    } else {
H
Haojun Liao 已提交
3598 3599
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3600

H
Haojun Liao 已提交
3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3613

H
Haojun Liao 已提交
3614
      code = doOpenReaderImpl(pPrevReader);
3615
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3616
        return code;
3617
      }
3618 3619 3620
    }
  }

3621
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3622
  return code;
H
Hongze Cheng 已提交
3623

3624
  _err:
H
Haojun Liao 已提交
3625
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3626
  return code;
H
refact  
Hongze Cheng 已提交
3627 3628 3629
}

void tsdbReaderClose(STsdbReader* pReader) {
3630 3631
  if (pReader == NULL) {
    return;
3632
  }
H
refact  
Hongze Cheng 已提交
3633

3634 3635
  {
    if (pReader->innerReader[0] != NULL) {
3636
      STsdbReader* p = pReader->innerReader[0];
3637

3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3649 3650 3651 3652 3653 3654

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3655
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3656

3657 3658 3659 3660
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3661
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3662 3663 3664 3665
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3666

3667
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3668
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3669 3670

  cleanupDataBlockIterator(&pReader->status.blockIter);
3671 3672

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3673
  destroyBlockScanInfo(pReader->status.pTableMap);
3674
  blockDataDestroy(pReader->pResBlock);
3675

H
Haojun Liao 已提交
3676 3677 3678
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3679

H
Haojun Liao 已提交
3680
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3681

3682
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3683
  SIOCostSummary* pCost = &pReader->cost;
3684

H
Haojun Liao 已提交
3685 3686
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3687 3688
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3689

H
Haojun Liao 已提交
3690 3691 3692 3693 3694
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3695

H
Haojun Liao 已提交
3696 3697
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
H
Hongze Cheng 已提交
3698 3699
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
H
Haojun Liao 已提交
3700 3701 3702 3703 3704
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3705

3706 3707
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3708 3709 3710
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3711
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3712 3713
}

3714
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3715
  // cleanup the data that belongs to the previous data block
3716 3717
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3718

3719
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3720

3721 3722 3723 3724 3725
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3726

3727 3728 3729
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3730
      buildBlockFromBufferSequentially(pReader);
3731
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3732
    }
3733 3734 3735
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3736
  }
3737

3738
  return false;
H
refact  
Hongze Cheng 已提交
3739 3740
}

3741 3742 3743 3744 3745
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3746
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3747
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3748
    pReader->step = EXTERNAL_ROWS_PREV;
3749 3750 3751
    if (ret) {
      return ret;
    }
3752
  }
3753

3754
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3755 3756 3757 3758 3759 3760 3761 3762
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
    resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3763
    pReader->step = EXTERNAL_ROWS_MAIN;
3764 3765 3766 3767 3768 3769 3770
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3771
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3772 3773
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3774
    resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3775 3776 3777 3778
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3779
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3780
    pReader->step = EXTERNAL_ROWS_NEXT;
3781 3782 3783 3784 3785 3786 3787 3788
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3789
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Haojun Liao 已提交
3790
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3791
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3792 3793 3794
    return false;
  }

H
Haojun Liao 已提交
3795
  return true;
3796 3797
}

H
Haojun Liao 已提交
3798 3799 3800 3801 3802
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3803 3804
}

H
Haojun Liao 已提交
3805
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3806
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3807
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3808
      setBlockInfo(pReader, rows, uid, pWindow);
3809
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3810
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3811
    } else {
H
Haojun Liao 已提交
3812
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3813 3814
    }
  } else {
H
Haojun Liao 已提交
3815
    setBlockInfo(pReader, rows, uid, pWindow);
3816 3817 3818
  }
}

3819
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3820
  int32_t code = 0;
3821
  *allHave = false;
H
Hongze Cheng 已提交
3822

3823
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3824 3825 3826 3827
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3828
  // there is no statistics data for composed block
3829 3830 3831 3832
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3833

3834
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3835

H
Hongze Cheng 已提交
3836 3837
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3838

3839 3840
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3841
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3842
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3843
    if (code != TSDB_CODE_SUCCESS) {
3844 3845
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3846 3847
      return code;
    }
3848 3849 3850
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3851
  }
H
Hongze Cheng 已提交
3852

3853
  *allHave = true;
H
Hongze Cheng 已提交
3854

3855 3856
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3857

3858 3859
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3876 3877
      i += 1;
      j += 1;
3878 3879 3880 3881 3882 3883 3884
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3885
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3886
  pReader->cost.smaLoadTime += elapsed;
3887
  pReader->cost.smaDataLoad += 1;
3888 3889 3890

  *pBlockStatis = pSup->plist;

3891
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3892 3893
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3894
  return code;
H
Hongze Cheng 已提交
3895 3896
}

3897
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3898 3899 3900
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3901
    return pReader->pResBlock->pDataBlock;
3902
  }
3903

H
Haojun Liao 已提交
3904 3905 3906 3907 3908 3909 3910 3911
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3912

3913
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3914
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3915
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3916 3917
    terrno = code;
    return NULL;
3918
  }
3919 3920 3921

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3922 3923
}

3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3936
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
3937
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
3938 3939
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3940

L
Liu Jicong 已提交
3941
  pReader->order = pCond->order;
3942
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3943
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3944
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3945
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3946

3947
  // allocate buffer in order to load data blocks from file
3948
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3949 3950
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3951
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3952
  tsdbDataFReaderClose(&pReader->pFileReader);
3953

3954
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3955

3956
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3957
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3958

H
Hongze Cheng 已提交
3959
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
3960
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3961

3962
  int32_t         code = 0;
3963 3964
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3965 3966 3967 3968 3969 3970
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3971 3972
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3973 3974 3975
      return code;
    }
  }
H
Hongze Cheng 已提交
3976

H
Hongze Cheng 已提交
3977
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
3978
                " in query %s",
H
Hongze Cheng 已提交
3979 3980
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
3981

3982
  return code;
H
Hongze Cheng 已提交
3983
}
H
Hongze Cheng 已提交
3984

3985 3986 3987
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3988

3989 3990 3991 3992
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3993

3994 3995
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3996

3997 3998 3999
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4000

4001
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4002

4003
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4004

4005 4006
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4007

4008 4009
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4010

4011 4012
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4013
  }
H
Hongze Cheng 已提交
4014

4015
  pTableBlockInfo->numOfTables = numOfTables;
4016
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4017

4018 4019
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4020
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4021

4022 4023
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4024

4025 4026 4027
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4028

4029 4030 4031
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4032

4033 4034 4035
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4036

4037 4038
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4039

H
Haojun Liao 已提交
4040
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4041 4042 4043 4044 4045
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4046

4047 4048
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4049
    }
H
refact  
Hongze Cheng 已提交
4050

H
Hongze Cheng 已提交
4051 4052
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4053
  }
H
Hongze Cheng 已提交
4054

H
refact  
Hongze Cheng 已提交
4055 4056
  return code;
}
H
Hongze Cheng 已提交
4057

H
refact  
Hongze Cheng 已提交
4058
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4059
  int64_t rows = 0;
H
Hongze Cheng 已提交
4060

4061 4062
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4063

4064 4065 4066 4067 4068
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4069
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4070 4071 4072 4073 4074 4075 4076
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4077
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4078 4079 4080 4081 4082 4083 4084 4085
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4086

H
refact  
Hongze Cheng 已提交
4087
  return rows;
H
Hongze Cheng 已提交
4088
}
D
dapan1121 已提交
4089

L
Liu Jicong 已提交
4090
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4103

D
dapan1121 已提交
4104
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4105
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4120
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4121

D
dapan1121 已提交
4122 4123
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4124

H
Haojun Liao 已提交
4125
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4154
  // fs
H
Hongze Cheng 已提交
4155 4156 4157 4158 4159
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4160 4161 4162 4163 4164 4165 4166 4167

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4168
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4169
  _exit:
H
Hongze Cheng 已提交
4170 4171 4172
  return code;
}

H
Haojun Liao 已提交
4173
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4174 4175 4176 4177 4178 4179 4180 4181 4182
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4183
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4184
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4185
  }
H
Haojun Liao 已提交
4186 4187
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}