tsdbRead.c 138.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40 41
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
  STimeWindow window;
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Hongze Cheng 已提交
81 82 83
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
84
  SArray*          pColAgg;
85
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
86
  SColumnDataAgg** plist;
87
  int16_t*         colIds;  // column ids for loading file block data
88
  int32_t          numOfCols;
89
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
90 91
} SBlockLoadSuppInfo;

92
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
93 94 95 96 97
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
98
  SSttBlockLoadInfo* pInfo;
99 100
} SLastBlockReader;

101
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
102 103 104
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
105
  int32_t           order;
H
Hongze Cheng 已提交
106
  SLastBlockReader* pLastBlockReader;  // last file block reader
107
} SFilesetIter;
H
Haojun Liao 已提交
108 109

typedef struct SFileDataBlockInfo {
110
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
111
  uint64_t uid;
112
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
113 114 115
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
116
  int32_t   numOfBlocks;
117
  int32_t   index;
H
Hongze Cheng 已提交
118
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
119
  int32_t   order;
120
  SDataBlk  block;  // current SDataBlk data
121
  SHashObj* pTableMap;
H
Haojun Liao 已提交
122 123 124
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
125 126 127 128
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
129 130
} SFileBlockDumpInfo;

131
typedef struct SUidOrderCheckInfo {
132 133
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
134 135
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
136
typedef struct SReaderStatus {
137
  bool                 loadFromFile;       // check file stage
138
  bool                 composedDataBlock;  // the returned data block is a composed block or not
139 140
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
141
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
142
  SFileBlockDumpInfo   fBlockDumpInfo;
143 144 145 146
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
147 148
} SReaderStatus;

H
Hongze Cheng 已提交
149
struct STsdbReader {
H
Haojun Liao 已提交
150 151 152 153 154 155 156
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
157 158
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
159
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
160
  STsdbReadSnap*     pReadSnap;
161
  SIOCostSummary     cost;
162 163
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
164 165
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
166

167 168
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
169
};
H
Hongze Cheng 已提交
170

H
Haojun Liao 已提交
171
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
172 173
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
174
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
175 176
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
177
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
178
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
179
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
180
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
181
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
182
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
183
                                         int32_t rowIndex);
184
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
185
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
186

H
Hongze Cheng 已提交
187 188 189 190
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
191 192
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
193

dengyihao's avatar
dengyihao 已提交
194 195 196 197
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
198
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
199 200 201
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
202
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
203 204
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
205

206 207
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

208 209 210
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

211
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
212

213
  pSupInfo->numOfCols = numOfCols;
214
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
215
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
216 217 218
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
219 220
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
221

H
Haojun Liao 已提交
222 223
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
224
    pSupInfo->colIds[i] = pCol->info.colId;
225 226 227 228

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
229
  }
H
Hongze Cheng 已提交
230

H
Haojun Liao 已提交
231 232
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
233

234
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
235
  // allocate buffer in order to load data blocks from file
236
  // todo use simple hash instead, optimize the memory consumption
237 238 239
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
240 241 242
    return NULL;
  }

243
  for (int32_t j = 0; j < numOfTables; ++j) {
244
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
245
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
246
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
247
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
248
    } else {
H
Haojun Liao 已提交
249
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
250
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
251
    }
wmmhello's avatar
wmmhello 已提交
252

253 254 255
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
256 257
  }

258 259
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
260

261
  return pTableMap;
H
Hongze Cheng 已提交
262
}
H
Hongze Cheng 已提交
263

H
Haojun Liao 已提交
264
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
265 266
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
267
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
268 269
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
270
    if (p->iter.iter != NULL) {
271
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
272 273
    }

274
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
275
    p->lastKey = ts;
276 277 278
  }
}

279 280 281
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
282

283 284 285
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
286

287 288 289
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
290

291 292 293 294
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
295

296 297 298 299
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
300 301 302 303 304
  }

  taosHashCleanup(pTableMap);
}

305
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
306 307
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
308
}
H
Hongze Cheng 已提交
309

310 311 312
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
313
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
314

315
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
316
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
317

dengyihao's avatar
dengyihao 已提交
318
  STimeWindow win = *pWindow;
319 320 321 322 323 324
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
325

H
Haojun Liao 已提交
326
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
327 328 329 330 331 332
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
333 334 335
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
336 337 338 339
  }
}

// init file iterator
340
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
341
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
342

343 344
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
345
  pIter->pFileList = aDFileSet;
346
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
347

348 349 350 351
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
352
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
353 354
      return code;
    }
355 356
  }

357 358 359 360 361 362 363 364
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

365
  if (pLReader->pInfo == NULL) {
366
    // here we ignore the first column, which is always be the primary timestamp column
367 368
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
369 370 371 372
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
373 374
  }

375
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
376 377 378
  return TSDB_CODE_SUCCESS;
}

379
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
380 381
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
382 383 384
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
385 386 387
    return false;
  }

H
Haojun Liao 已提交
388 389 390
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

391 392
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
393
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
394

H
Haojun Liao 已提交
395 396
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
397

398
  while (1) {
H
Haojun Liao 已提交
399 400 401
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
402

403
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
404

405 406 407 408
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
409

410 411
    pReader->cost.headFileLoad += 1;

412 413 414 415 416 417 418 419 420 421 422 423
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
424 425 426
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
427 428
      continue;
    }
C
Cary Xu 已提交
429

430
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
431
              pReader->window.ekey, pReader->idStr);
432 433
    return true;
  }
434

435
_err:
H
Haojun Liao 已提交
436 437 438
  return false;
}

439
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
440 441
  pIter->order = order;
  pIter->index = -1;
442
  pIter->numOfBlocks = 0;
443 444 445 446 447 448 449
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
450
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
451

H
Haojun Liao 已提交
452
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
453 454
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
455 456
}

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

480 481
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
482
  int32_t      code = 0;
483
  int8_t       level = 0;
H
Haojun Liao 已提交
484
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
485 486
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
487
    goto _end;
H
Hongze Cheng 已提交
488 489
  }

C
Cary Xu 已提交
490 491 492 493
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
494
  initReaderStatus(&pReader->status);
495

L
Liu Jicong 已提交
496
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
497 498
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
499
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
500 501
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
502
  pReader->type = pCond->type;
503
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
504

505
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
506

507
  limitOutputBufferSize(pCond, &pReader->capacity);
508

509 510
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
511
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
512
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
513
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
514 515 516
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
517

518 519
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
520
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
521 522 523 524 525
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

526 527 528 529
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
530
  }
H
Hongze Cheng 已提交
531

532 533
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
534 535
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
536

H
Haojun Liao 已提交
537 538
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
539 540 541
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
542

H
Haojun Liao 已提交
543
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
544
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
545

546
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
547
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
548
  if (code != TSDB_CODE_SUCCESS) {
549
    goto _end;
H
Haojun Liao 已提交
550
  }
H
Hongze Cheng 已提交
551

552 553
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
554
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
555 556
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
557

558 559 560 561
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
562
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
563

564
    // uid check
H
Hongze Cheng 已提交
565
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
566 567 568 569
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
570
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
571 572 573 574 575 576
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
577
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
578 579
    }

H
Hongze Cheng 已提交
580
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
581
  }
H
Hongze Cheng 已提交
582

583
  int64_t et2 = taosGetTimestampUs();
584
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
585
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
586 587 588

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

589
_end:
H
Hongze Cheng 已提交
590
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
591 592
  return code;
}
H
Hongze Cheng 已提交
593

594
static void cleanupTableScanInfo(SHashObj* pTableMap) {
595
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
596
  while (1) {
597
    px = taosHashIterate(pTableMap, px);
598 599 600 601
    if (px == NULL) {
      break;
    }

602
    // reset the index in last block when handing a new file
603
    tMapDataClear(&px->mapData);
604 605
    taosArrayClear(px->pBlockList);
  }
606 607
}

608
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
609 610 611 612 613 614
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
615

dengyihao's avatar
dengyihao 已提交
616
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
617
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
618

619
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
620

621
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
622
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
623

624
    sizeInDisk += pScanInfo->mapData.nData;
625
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
626 627
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
628

629
      // 1. time range check
630
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
631 632
        continue;
      }
H
Hongze Cheng 已提交
633

634
      // 2. version range check
H
Hongze Cheng 已提交
635
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
636 637
        continue;
      }
638

639 640 641
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts};

H
Haojun Liao 已提交
642
      void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
H
Haojun Liao 已提交
643
      if (p == NULL) {
644
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
645 646
        return TSDB_CODE_OUT_OF_MEMORY;
      }
647

648
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
649
    }
H
Hongze Cheng 已提交
650

H
Haojun Liao 已提交
651
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
652 653 654 655
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
656
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
657
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
658

659
  double el = (taosGetTimestampUs() - st) / 1000.0;
660 661 662 663 664
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
665

666
  pReader->cost.numOfBlocks += total;
667
  pReader->cost.headFileLoadTime += el;
668

H
Haojun Liao 已提交
669 670
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
671

672
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
673
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
674
  pDumpInfo->allDumped = true;
675
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
676 677
}

678 679
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
680
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
681
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
682 683 684
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
685
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
686 687 688 689
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
690
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
691
  }
H
Haojun Liao 已提交
692 693
}

694
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
695 696
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
697 698
    return NULL;
  }
699 700 701

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
702 703
}

H
Hongze Cheng 已提交
704
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
705

H
Haojun Liao 已提交
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
779
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
780 781 782 783
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
784 785
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
786 787
    while (1) {
      // check can return
H
Hongze Cheng 已提交
788 789 790
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
791 792

      // change start or end position
H
Hongze Cheng 已提交
793
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
794 795
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
796
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
797 798 799 800
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
801
  } else {  // DESC
H
Haojun Liao 已提交
802
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
803 804
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
805 806
    while (1) {
      // check can return
H
Hongze Cheng 已提交
807 808 809
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
810 811

      // change start or end position
H
Hongze Cheng 已提交
812
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
813 814
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
815
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
816 817 818 819 820 821 822 823 824 825
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
826
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
827 828 829 830 831 832 833 834 835 836 837 838

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order);
  }

  return endPos;
}

839
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
840
  SReaderStatus*  pStatus = &pReader->status;
841
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
842

843
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
844
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
845
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
846
  SSDataBlock*        pResBlock = pReader->pResBlock;
847
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
848

H
Haojun Liao 已提交
849
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
850
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
851

H
Haojun Liao 已提交
852
  SColVal cv = {0};
853
  int64_t st = taosGetTimestampUs();
854 855
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
856

857 858
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
859 860 861
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
862 863 864 865 866
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
      int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
    }
H
Haojun Liao 已提交
867 868 869
  }

  // time window check
870 871 872 873 874 875 876
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
877
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
878
  if (remain > pReader->capacity) {  // output buffer check
879 880 881
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
882 883
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
884
  int32_t          i = 0;
885 886
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
887
    if (asc) {
H
Haojun Liao 已提交
888
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
889 890 891 892
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
893
    }
H
Haojun Liao 已提交
894

895 896 897
    i += 1;
  }

898 899 900
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
901 902 903
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
904
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
905 906 907
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
908
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
909 910 911 912
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
913
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
914 915 916 917

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
918
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
919 920 921 922 923 924 925 926 927 928 929
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
930
      }
H
Haojun Liao 已提交
931

932
      colIndex += 1;
933
      i += 1;
934 935
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
936
      i += 1;
H
Haojun Liao 已提交
937
    }
938 939
  }

940
  // fill the mis-matched columns with null value
941
  while (i < numOfOutputCols) {
942 943 944
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
945
  }
H
Haojun Liao 已提交
946

947
  pResBlock->info.rows = remain;
948
  pDumpInfo->rowIndex += step * remain;
949

950
  // check if current block are all handled
951
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
952 953 954 955
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
956
  } else {
957 958
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
959
  }
H
Haojun Liao 已提交
960

961
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
962
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
963

964
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
965
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
966
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
967 968
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
969 970 971 972

  return TSDB_CODE_SUCCESS;
}

973 974
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
975 976
  int64_t st = taosGetTimestampUs();

977 978
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
979 980
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
981 982 983 984
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

985
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
986
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
987
  ASSERT(pBlockInfo != NULL);
988

H
Hongze Cheng 已提交
989
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
990
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
991 992
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
993
                  ", rows:%d, code:%s %s",
994
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
995 996 997
              tstrerror(code), pReader->idStr);
    return code;
  }
998

999
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1000

1001
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
1002
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1003 1004
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1005 1006 1007

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1008

H
Haojun Liao 已提交
1009
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1010
}
H
Hongze Cheng 已提交
1011

H
Haojun Liao 已提交
1012 1013 1014
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1015

H
Haojun Liao 已提交
1016 1017 1018 1019
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1020

H
Haojun Liao 已提交
1021 1022
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1023

H
Haojun Liao 已提交
1024 1025
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1026

H
Haojun Liao 已提交
1027
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1028 1029
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1030

H
Haojun Liao 已提交
1031 1032 1033 1034
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1035

H
Haojun Liao 已提交
1036 1037
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1038

H
Haojun Liao 已提交
1039
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1040
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1041
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1042

H
Haojun Liao 已提交
1043
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1044

H
Haojun Liao 已提交
1045 1046
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1047

H
Haojun Liao 已提交
1048 1049 1050 1051 1052 1053 1054
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1055

1056
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1057
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1058

1059 1060 1061
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1062
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1063 1064 1065
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1066
    if (pScanInfo == NULL) {
1067
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1068 1069 1070
      return TSDB_CODE_INVALID_PARA;
    }

1071 1072
    SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1073
  }
1074 1075 1076 1077 1078 1079

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1080
}
H
Hongze Cheng 已提交
1081

1082
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1083
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1084

1085
  SBlockOrderSupporter sup = {0};
1086
  pBlockIter->numOfBlocks = numOfBlocks;
1087
  taosArrayClear(pBlockIter->blockList);
1088
  pBlockIter->pTableMap = pReader->status.pTableMap;
1089

1090 1091
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1092

1093
  int64_t st = taosGetTimestampUs();
1094
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1095 1096 1097
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1098

1099 1100 1101 1102 1103 1104 1105
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1106

1107 1108 1109 1110
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1111

1112 1113
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1114

1115 1116 1117 1118 1119
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1120

1121
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1122

1123 1124 1125
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1126
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1127 1128 1129 1130 1131
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1132

1133
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1134

1135
  // since there is only one table qualified, blocks are not sorted
1136 1137
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1138 1139
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1140
    }
1141

1142
    int64_t et = taosGetTimestampUs();
1143
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1144
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1145

1146
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1147
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1148
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1149
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1150
  }
H
Haojun Liao 已提交
1151

1152 1153
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1154

1155
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1156

1157 1158 1159 1160 1161
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1162
  }
H
Haojun Liao 已提交
1163

1164 1165 1166 1167
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1168

1169 1170
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1171

1172 1173 1174 1175
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1176

1177 1178
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1179
  }
H
Haojun Liao 已提交
1180

1181
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1182 1183
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1184 1185
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1186

1187
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1188
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1189

1190
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1191
}
H
Hongze Cheng 已提交
1192

H
Haojun Liao 已提交
1193
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1194 1195
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1196
  int32_t step = asc ? 1 : -1;
1197
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1198 1199 1200
    return false;
  }

1201
  pBlockIter->index += step;
H
Haojun Liao 已提交
1202
  doSetCurrentBlock(pBlockIter, idStr);
1203

1204 1205 1206
  return true;
}

1207 1208 1209
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1210
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1211 1212
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1213 1214
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1215
}
H
Hongze Cheng 已提交
1216

1217
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1218
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1219
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1220
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1221
    return false;
1222 1223
  }

H
Haojun Liao 已提交
1224
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1225
    return false;
1226 1227
  }

1228
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1229
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1230 1231
  *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
//  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1232
  return true;
1233 1234 1235 1236 1237
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1238
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1239 1240
  int32_t index = pBlockIter->index;

1241
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1254
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1255
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1256 1257 1258 1259
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1260 1261 1262 1263 1264
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1265

1266 1267 1268
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1269

H
Haojun Liao 已提交
1270
  doSetCurrentBlock(pBlockIter, "");
1271 1272 1273
  return TSDB_CODE_SUCCESS;
}

1274
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1275 1276
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1277
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1278
  } else {
1279
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1280
  }
H
Haojun Liao 已提交
1281
}
H
Hongze Cheng 已提交
1282

H
Hongze Cheng 已提交
1283
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1284
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1285

1286
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1287
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1288
}
H
Hongze Cheng 已提交
1289

H
Hongze Cheng 已提交
1290
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1291 1292
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1293 1294
}

H
Hongze Cheng 已提交
1295
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1296 1297 1298 1299 1300
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1301
      if (p->version >= pBlock->minVer) {
1302 1303 1304
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1305
      if (p->version >= pBlock->minVer) {
1306 1307
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1308 1309
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1323
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1324 1325 1326 1327
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1328
  // ts is not overlap
1329
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1330
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1331 1332 1333 1334 1335
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1336 1337 1338 1339
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1340
    while (1) {
1341 1342 1343 1344 1345
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1346 1347 1348
      }
    }

1349 1350
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1351 1352
}

H
Haojun Liao 已提交
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1366 1367
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1368

1369
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1370

1371
  // overlap with neighbor
1372
  if (hasNeighbor) {
1373
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1374 1375
  }

1376
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1377 1378
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1379

H
Haojun Liao 已提交
1380 1381 1382
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1383 1384
  }

H
Haojun Liao 已提交
1385 1386 1387 1388
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1389

H
Haojun Liao 已提交
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1404 1405 1406 1407

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1408
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1409
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1410 1411 1412
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1413 1414 1415
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1416 1417
}

H
Haojun Liao 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1427
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1428
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1429 1430
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1431

1432 1433 1434
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1435
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1436

1437
  blockDataUpdateTsWindow(pBlock, 0);
1438
  pBlock->info.uid = pBlockScanInfo->uid;
1439

1440
  setComposedBlockFlag(pReader, true);
1441

1442
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1443
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1444
                " - %" PRId64 " %s",
1445 1446
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1447 1448

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1449 1450 1451
  return code;
}

1452 1453
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1454 1455 1456 1457 1458
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1459
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1460 1461

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1462
    if (nextKey != key) {  // merge is not needed
1463
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1464 1465 1466 1467 1468 1469 1470 1471
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1472 1473
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1474 1475 1476 1477 1478 1479 1480 1481
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1482 1483
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1484 1485 1486 1487 1488 1489 1490
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1491
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1506 1507 1508
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1509
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1510 1511
  }

1512
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1513 1514 1515 1516 1517 1518
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1519 1520 1521 1522 1523 1524
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1525 1526 1527 1528 1529 1530
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1531
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1532
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1533
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1534 1535
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1536 1537
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1538
  }
H
Haojun Liao 已提交
1539 1540
}

1541
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1542 1543 1544 1545 1546 1547
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1548
  int64_t tsLast = INT64_MIN;
1549
  if (hasDataInLastBlock(pLastBlockReader)) {
1550 1551
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1552

H
Hongze Cheng 已提交
1553 1554
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1555

1556 1557
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1558
    minKey = INT64_MAX;  // chosen the minimum value
1559
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1560 1561
      minKey = tsLast;
    }
1562

1563 1564 1565
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1566

1567
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1568 1569 1570 1571
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1572
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1573 1574 1575 1576 1577 1578 1579
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1580
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1581 1582
      minKey = key;
    }
1583 1584 1585 1586
  }

  bool init = false;

1587
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1588
  // DESC: mem -----> imem -----> last block -----> file block
1589 1590
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1591
      init = true;
H
Haojun Liao 已提交
1592 1593 1594 1595
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1596
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1597 1598
    }

1599
    if (minKey == tsLast) {
1600
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1601 1602 1603
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1604
        init = true;
H
Haojun Liao 已提交
1605 1606 1607 1608
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1609
      }
1610
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1611
    }
1612

1613
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1614 1615 1616
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1617 1618
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1619 1620 1621 1622 1623 1624 1625 1626
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1627 1628 1629 1630 1631
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1632
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1633 1634 1635 1636 1637 1638
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1639
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1640 1641
        return code;
      }
1642 1643
    }

1644
    if (minKey == tsLast) {
1645
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1646 1647 1648
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1649
        init = true;
H
Haojun Liao 已提交
1650 1651 1652 1653
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1654
      }
1655
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1656 1657 1658
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1659 1660 1661
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1662
        init = true;
H
Haojun Liao 已提交
1663 1664 1665 1666
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1667 1668 1669
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1670 1671
  }

1672 1673 1674 1675 1676
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1677 1678 1679 1680 1681 1682 1683
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1684 1685 1686
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1687
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1688
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1689 1690 1691

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1692
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1693
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1694

1695 1696 1697 1698 1699
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1700 1701 1702 1703
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1704 1705 1706

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1707
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1708

H
Haojun Liao 已提交
1709
      code = tRowMergerGetRow(&merge, &pTSRow);
1710 1711 1712
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1713

1714 1715 1716 1717 1718 1719
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1720 1721 1722 1723 1724
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1725
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1726
    ASSERT(mergeBlockData);
1727 1728

    // merge with block data if ts == key
H
Haojun Liao 已提交
1729
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1730 1731 1732
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1733
    code = tRowMergerGetRow(&merge, &pTSRow);
1734 1735 1736 1737 1738 1739 1740 1741 1742
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1743 1744 1745 1746

  return TSDB_CODE_SUCCESS;
}

1747 1748
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1749 1750
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1751
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1752
    // no last block available, only data block exists
1753
    if (!hasDataInLastBlock(pLastBlockReader)) {
1754 1755 1756 1757 1758 1759 1760 1761 1762
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1763
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1764 1765 1766 1767
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1768

H
Haojun Liao 已提交
1769 1770 1771 1772 1773
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1774
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1775 1776 1777 1778

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1779
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1780

H
Haojun Liao 已提交
1781
        code = tRowMergerGetRow(&merge, &pTSRow);
1782 1783 1784 1785
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1786
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1787

1788 1789
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1790
        return code;
1791
      } else {
1792 1793
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1794
      }
1795
    } else {  // desc order
1796
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1797
    }
1798
  } else {  // only last block exists
1799
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1800
  }
1801 1802
}

1803 1804
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1805 1806 1807
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1808 1809 1810
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1811 1812
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1813 1814
  ASSERT(pRow != NULL && piRow != NULL);

1815
  int64_t tsLast = INT64_MIN;
1816 1817 1818
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1819

H
Hongze Cheng 已提交
1820
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1821 1822 1823 1824

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1825
  int64_t minKey = 0;
1826 1827 1828 1829 1830
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1831

1832 1833 1834
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1835

1836
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1837 1838
      minKey = key;
    }
1839

1840
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1841 1842 1843
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1844
    minKey = INT64_MIN;  // let find the maximum ts value
1845 1846 1847 1848 1849 1850 1851 1852
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1853
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1854 1855 1856
      minKey = key;
    }

1857
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1858 1859
      minKey = tsLast;
    }
1860 1861 1862 1863
  }

  bool init = false;

1864 1865 1866 1867
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1868
      init = true;
1869
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1870
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1871 1872 1873 1874
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1875
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1876 1877
    }

1878
    if (minKey == tsLast) {
1879
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1880 1881 1882
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1883
        init = true;
1884
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1885 1886 1887
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1888
      }
H
Haojun Liao 已提交
1889

1890
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1891 1892 1893
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1894 1895 1896
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1897 1898
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1899 1900 1901 1902
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1903
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1904 1905 1906
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1907
      }
H
Haojun Liao 已提交
1908

1909 1910
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1911 1912 1913
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1914 1915
    }

1916
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1917
      if (init) {
1918 1919 1920 1921
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1922 1923
        tRowMerge(&merge, pRow);
      } else {
1924
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1925
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1926 1927 1928
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1929
      }
H
Haojun Liao 已提交
1930 1931 1932 1933 1934
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1935 1936 1937 1938
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1939
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1940
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1941 1942 1943 1944
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
1945 1946 1947 1948 1949
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1950 1951 1952
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1953 1954 1955
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1956 1957
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1958
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1959 1960 1961
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1962
      }
H
Haojun Liao 已提交
1963 1964 1965 1966 1967
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1968 1969 1970
    }

    if (minKey == tsLast) {
1971
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1972 1973 1974
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1975
        init = true;
1976
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1977 1978 1979
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1980
      }
1981
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1982 1983 1984
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1985
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1986
      if (!init) {
1987
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1988 1989 1990
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
1991
      } else {
1992 1993 1994
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
1995
        tRowMerge(&merge, &fRow);
1996 1997
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1998 1999 2000
    }
  }

2001
  if (merge.pTSchema == NULL) {
2002 2003 2004
    return code;
  }

H
Haojun Liao 已提交
2005
  code = tRowMergerGetRow(&merge, &pTSRow);
2006 2007 2008 2009
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2010 2011 2012 2013
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2014
  return code;
2015 2016
}

2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2042
                  "-%" PRId64 " %s",
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2063
                  "-%" PRId64 " %s",
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2081 2082
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2083 2084 2085 2086 2087 2088 2089 2090
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2102
  TSDBKEY k = {.ts = ts, .version = ver};
2103 2104
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2105 2106 2107
    return false;
  }

2108 2109 2110
  return true;
}

2111
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2112
  // the last block reader has been initialized for this table.
2113
  if (pLBlockReader->uid == pScanInfo->uid) {
2114 2115 2116
    return true;
  }

2117 2118
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2119 2120
  }

2121 2122
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2123

H
Hongze Cheng 已提交
2124
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2125 2126 2127
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2128
  } else {
2129
    w.ekey = pScanInfo->lastKey + step;
2130 2131
  }

2132 2133 2134
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2135 2136 2137 2138
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2139
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2140 2141
}

2142
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2143
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2144
  return TSDBROW_TS(&row);
2145 2146
}

H
Hongze Cheng 已提交
2147
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2148 2149 2150 2151

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2152 2153
  }

2154
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2155
}
2156

2157 2158
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2159 2160 2161 2162
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2163 2164
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2165 2166 2167
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2168 2169 2170 2171 2172
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2173
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2174
    code = tRowMergerGetRow(&merge, &pTSRow);
2175 2176 2177 2178
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2179 2180 2181 2182 2183 2184 2185 2186
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2187 2188
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2189 2190
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2191
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2192
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2193
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2194
  } else {
2195 2196 2197 2198 2199 2200 2201 2202 2203
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2204
    // imem + file + last block
2205
    if (pBlockScanInfo->iiter.hasVal) {
2206
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2207 2208
    }

2209
    // mem + file + last block
2210
    if (pBlockScanInfo->iter.hasVal) {
2211
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2212
    }
2213

2214 2215
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2216 2217 2218
  }
}

2219
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2220 2221
  int32_t code = TSDB_CODE_SUCCESS;

2222 2223
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2224
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2225 2226 2227
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2228 2229 2230 2231

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2232 2233
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2234 2235
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2236 2237 2238
      goto _end;
    }

H
Haojun Liao 已提交
2239
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2240
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2241 2242 2243

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2244 2245
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2246 2247 2248
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2249 2250
    }
  } else {  // file blocks not exist
2251 2252 2253
    pBlockScanInfo = pReader->status.pTableIter;
  }

2254
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2255 2256
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2257

2258
  while (1) {
2259
    bool hasBlockData = false;
2260
    {
H
Haojun Liao 已提交
2261
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2262 2263 2264 2265 2266
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2267 2268
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2269
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2270
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2271
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2272 2273 2274
          break;
        }
      }
2275
    }
2276

2277
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2278

2279 2280 2281
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2282 2283
    }

2284
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2285

2286
    // currently loaded file data block is consumed
2287
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2288
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2289
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2290 2291 2292 2293 2294
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2295 2296 2297
    }
  }

H
Hongze Cheng 已提交
2298
_end:
2299
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2300 2301
  blockDataUpdateTsWindow(pResBlock, 0);

2302
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2303
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2304 2305 2306

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2307

2308 2309
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2310
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2311
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2312
              pResBlock->info.rows, el, pReader->idStr);
2313
  }
2314

H
Haojun Liao 已提交
2315
  return code;
2316 2317 2318 2319
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2320 2321
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2322 2323 2324
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2325

2326 2327 2328
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2329 2330
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2331
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2332 2333
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2334
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2335
    if (code != TSDB_CODE_SUCCESS) {
2336 2337 2338 2339 2340
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2341
      tsdbDelFReaderClose(&pDelFReader);
2342 2343 2344
      goto _err;
    }

H
Hongze Cheng 已提交
2345
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2346 2347 2348
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2349 2350
      goto _err;
    }
2351

2352 2353 2354
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2355
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2356
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2357 2358 2359 2360 2361 2362 2363
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2364
    }
2365
  }
2366

2367 2368 2369 2370 2371 2372 2373
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2374 2375
  }

2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2390 2391
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2392 2393
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2394
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2395 2396
  return code;

2397 2398 2399
_err:
  taosArrayDestroy(pDelData);
  return code;
2400 2401
}

H
Haojun Liao 已提交
2402
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2403
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2404
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2405
  if (pRow != NULL) {
2406 2407 2408
    key = TSDBROW_KEY(pRow);
  }

2409
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2410
  if (pRow != NULL) {
2411 2412 2413 2414 2415 2416 2417 2418 2419
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2420
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2421
  SReaderStatus* pStatus = &pReader->status;
2422
  pBlockNum->numOfBlocks = 0;
2423
  pBlockNum->numOfLastFiles = 0;
2424

2425
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2426
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2427 2428

  while (1) {
2429
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2430
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2431 2432 2433
      break;
    }

H
Haojun Liao 已提交
2434
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2435 2436
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2437
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2438 2439 2440
      return code;
    }

H
Hongze Cheng 已提交
2441
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2442
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2443
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2444
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2445 2446 2447
        return code;
      }

2448
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2449 2450 2451
        break;
      }
    }
2452

H
Haojun Liao 已提交
2453 2454 2455
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2456
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2457 2458 2459
  return TSDB_CODE_SUCCESS;
}

2460
static int32_t uidComparFunc(const void* p1, const void* p2) {
2461 2462
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2463 2464 2465
  if (pu1 == pu2) {
    return 0;
  } else {
2466
    return (pu1 < pu2) ? -1 : 1;
2467 2468
  }
}
2469

2470
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2471 2472 2473 2474
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2475
  while (p != NULL) {
2476 2477 2478 2479 2480 2481 2482 2483
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2484
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2485 2486 2487 2488
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2489

2490
  if (pOrderCheckInfo->tableUidList == NULL) {
2491 2492 2493 2494 2495 2496
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2497
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2498 2499 2500
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2501 2502
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2503 2504
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2505 2506

      // the tableMap has already updated
2507
      if (pStatus->pTableIter == NULL) {
2508
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2509 2510 2511 2512 2513 2514 2515 2516 2517
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2518
      }
2519
    }
2520
  }
2521

2522 2523 2524
  return TSDB_CODE_SUCCESS;
}

2525
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2538
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2539
  SReaderStatus*    pStatus = &pReader->status;
2540 2541
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2542 2543
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2544
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2545 2546
    return code;
  }
2547

2548
  while (1) {
2549 2550
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2551
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2552
    if (!hasVal) {
2553 2554
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2555 2556 2557
        return TSDB_CODE_SUCCESS;
      }
      continue;
2558 2559
    }

2560 2561 2562 2563 2564 2565 2566 2567
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2568

2569
    // current table is exhausted, let's try next table
2570 2571
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2572 2573
      return TSDB_CODE_SUCCESS;
    }
2574 2575 2576
  }
}

2577
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2578 2579
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2580 2581 2582

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2583 2584 2585
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2586

2587
  if (pBlockInfo != NULL) {
2588 2589 2590 2591 2592
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2593
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2594
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2595 2596 2597 2598
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2599
  if (pBlockInfo != NULL) {
2600
    pBlock = getCurrentBlock(pBlockIter);
2601 2602
  }

2603
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2604
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2605

2606 2607 2608
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2609
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2610
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2611 2612
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2613 2614 2615
    }

    // build composed data block
2616
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2617
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2618
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2619
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2620
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2621
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2622 2623 2624 2625
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2626
      ASSERT(tsLast >= pBlock->maxKey.ts);
2627 2628
      tBlockDataReset(&pReader->status.fileBlockData);

2629
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2630
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2631
    } else {  // whole block is required, return it directly
2632 2633 2634 2635 2636 2637 2638
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2639 2640 2641 2642 2643
  }

  return code;
}

H
Haojun Liao 已提交
2644
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2645 2646
  SReaderStatus* pStatus = &pReader->status;

2647
  while (1) {
2648 2649 2650
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2651
        return TSDB_CODE_SUCCESS;
2652 2653 2654 2655
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2656
    initMemDataIterator(pBlockScanInfo, pReader);
2657

2658
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2659
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2660 2661 2662 2663
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2664
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2665
      return TSDB_CODE_SUCCESS;
2666 2667 2668 2669 2670
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2671
      return TSDB_CODE_SUCCESS;
2672 2673 2674 2675
    }
  }
}

2676
// set the correct start position in case of the first/last file block, according to the query time window
2677
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2678
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2679

2680 2681 2682
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2683 2684 2685

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2686
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2687 2688
}

2689
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2690 2691
  SBlockNumber num = {0};

2692
  int32_t code = moveToNextFile(pReader, &num);
2693 2694 2695 2696 2697
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2698
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2699 2700 2701 2702 2703
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2704 2705
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2706
  } else {  // no block data, only last block exists
2707
    tBlockDataReset(&pReader->status.fileBlockData);
2708
    resetDataBlockIterator(pBlockIter, pReader->order);
2709
  }
2710 2711

  // set the correct start position according to the query time window
2712
  initBlockDumpInfo(pReader, pBlockIter);
2713 2714 2715
  return code;
}

2716
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2717 2718
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2719 2720
}

2721
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2722
  int32_t code = TSDB_CODE_SUCCESS;
2723 2724
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2725 2726
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2727
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2728
  _begin:
2729 2730 2731 2732 2733
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2734 2735 2736 2737
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2738
    // all data blocks are checked in this last block file, now let's try the next file
2739 2740 2741 2742 2743 2744 2745 2746
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2747
      // this file does not have data files, let's start check the last block file if exists
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2763
  while (1) {
2764 2765
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2766
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2767
      code = buildComposedDataBlock(pReader);
2768 2769 2770 2771
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2772
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2773 2774
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2775
        } else {
H
Haojun Liao 已提交
2776
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2777 2778 2779 2780 2781 2782
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2783

2784 2785 2786 2787
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2788

2789 2790 2791 2792
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2793
          }
2794
        }
H
Haojun Liao 已提交
2795
      }
2796 2797

      code = doBuildDataBlock(pReader);
2798 2799
    }

2800 2801 2802 2803 2804 2805 2806 2807
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2808
}
H
refact  
Hongze Cheng 已提交
2809

2810 2811
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2812
  if (VND_IS_RSMA(pVnode)) {
2813
    int8_t  level = 0;
2814 2815
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2816
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2817 2818
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2819

2820
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2821 2822 2823 2824 2825 2826 2827
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2828
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2829 2830 2831 2832 2833
        break;
      }
      ++level;
    }

2834
    const char* str = (idStr != NULL) ? idStr : "";
2835 2836

    if (level == TSDB_RETENTION_L0) {
2837
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2838
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2839 2840
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2841
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2842
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2843 2844
      return VND_RSMA1(pVnode);
    } else {
2845
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2846
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2847 2848 2849 2850 2851 2852 2853
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2854
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2855
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2856 2857

  int64_t endVer = 0;
L
Liu Jicong 已提交
2858 2859
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2860 2861
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2862
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2863 2864
  }

H
Haojun Liao 已提交
2865
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2866 2867
}

2868
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2869 2870 2871 2872
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2873 2874 2875
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2876

2877 2878 2879 2880 2881 2882
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2883
        return false;
2884 2885 2886
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2887 2888
      }
    } else {
2889 2890 2891 2892 2893 2894 2895
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2896 2897
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2913 2914
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2915 2916 2917 2918 2919 2920
            return true;
          }
        }
      }

      return false;
2921 2922
    }
  } else {
2923 2924
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2925

2926 2927 2928 2929 2930 2931 2932
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2933
    } else {
2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2961 2962 2963 2964 2965
      }

      return false;
    }
  }
2966 2967

  return false;
2968 2969
}

2970
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2971
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2972 2973
    return NULL;
  }
H
Hongze Cheng 已提交
2974

2975
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2976
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2977
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2978
    pIter->hasVal = false;
H
Haojun Liao 已提交
2979 2980
    return NULL;
  }
H
Hongze Cheng 已提交
2981

2982
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2983
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2984
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
2985 2986
    return pRow;
  }
H
Hongze Cheng 已提交
2987

2988
  while (1) {
2989 2990
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2991 2992
      return NULL;
    }
H
Hongze Cheng 已提交
2993

2994
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2995

H
Haojun Liao 已提交
2996
    key = TSDBROW_KEY(pRow);
2997
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2998
      pIter->hasVal = false;
H
Haojun Liao 已提交
2999 3000
      return NULL;
    }
H
Hongze Cheng 已提交
3001

dengyihao's avatar
dengyihao 已提交
3002
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3003
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3004 3005 3006 3007
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3008

3009 3010
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3011
  while (1) {
3012 3013
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3014 3015
      break;
    }
H
Hongze Cheng 已提交
3016

3017
    // data exists but not valid
3018
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3019 3020 3021 3022 3023
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3024
    TSDBKEY k = TSDBROW_KEY(pRow);
3025
    if (k.ts != ts) {
H
Haojun Liao 已提交
3026 3027 3028
      break;
    }

H
Haojun Liao 已提交
3029
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3030 3031 3032 3033
    if (pTSchema == NULL) {
      return terrno;
    }

3034
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3035 3036 3037 3038 3039
  }

  return TSDB_CODE_SUCCESS;
}

3040
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3041
                                          SVersionRange* pVerRange, int32_t step) {
3042 3043
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3044
      rowIndex += step;
3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3061
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3062 3063
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3064
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3065
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3066

3067
  *state = CHECK_FILEBLOCK_QUIT;
3068
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3069

3070 3071 3072 3073
  int32_t     nextIndex = -1;
  SBlockIndex bIndex = {0};

  bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex);
3074
  if (!hasNeighbor) {  // do nothing
3075 3076 3077
    return 0;
  }

3078
  bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
3079
  if (overlap) {  // load next block
3080
    SReaderStatus*  pStatus = &pReader->status;
3081 3082
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3083
    // 1. find the next neighbor block in the scan block list
3084
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3085
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3086

3087
    // 2. remove it from the scan block list
3088
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3089

3090
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3091
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3092 3093 3094 3095
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3096
    // 4. check the data values
3097 3098 3099 3100
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3101
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3102 3103 3104 3105 3106 3107 3108
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3109 3110
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3111 3112
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3113
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3114
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3115
  int32_t step = asc ? 1 : -1;
3116

3117
  pDumpInfo->rowIndex += step;
3118
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3119 3120 3121
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3122

3123 3124 3125 3126
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3127

3128
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3129
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3130 3131 3132
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3133
      }
3134
    }
H
Haojun Liao 已提交
3135
  }
3136

H
Haojun Liao 已提交
3137 3138 3139
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3140
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3141
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3142
  pScanInfo->lastKey = ts;
3143
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3144 3145
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3146
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3147 3148 3149 3150 3151 3152 3153 3154 3155
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3156 3157
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3158
  TSDBROW* pNextRow = NULL;
3159
  TSDBROW  current = *pRow;
3160

3161 3162
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3163

3164 3165 3166
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3167
      return TSDB_CODE_SUCCESS;
3168
    } else {  // has next point in mem/imem
3169
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3170 3171 3172
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3173
        return TSDB_CODE_SUCCESS;
3174 3175
      }

H
Haojun Liao 已提交
3176
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3177 3178
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3179
        return TSDB_CODE_SUCCESS;
3180
      }
3181
    }
3182 3183
  }

3184 3185
  SRowMerger merge = {0};

3186
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3187
  terrno = 0;
H
Haojun Liao 已提交
3188
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3189 3190 3191
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3192

3193 3194
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3195
  }
H
Haojun Liao 已提交
3196

H
Haojun Liao 已提交
3197 3198 3199 3200
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3201 3202

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3203
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3204 3205 3206
    return terrno;
  }

H
Haojun Liao 已提交
3207 3208
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3209 3210 3211 3212 3213
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3214
  code = tRowMergerGetRow(&merge, pTSRow);
3215 3216 3217
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3218

3219
  tRowMergerClear(&merge);
3220
  *freeTSRow = true;
3221
  return TSDB_CODE_SUCCESS;
3222 3223
}

3224
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3225
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3226 3227
  SRowMerger merge = {0};

3228 3229 3230
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3231
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3232
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3233

H
Haojun Liao 已提交
3234 3235 3236 3237 3238 3239 3240 3241 3242 3243
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3244

3245
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3246 3247 3248 3249 3250 3251
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3252
  } else {
H
Haojun Liao 已提交
3253
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3254

H
Haojun Liao 已提交
3255
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3256
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3257 3258 3259 3260 3261 3262 3263 3264
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3265 3266

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3267 3268 3269 3270 3271
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3272
  }
3273

3274 3275
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3276 3277
}

3278 3279
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3280 3281
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3282
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3283
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3284

3285 3286
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3287
  if (pBlockScanInfo->iter.hasVal) {
3288 3289 3290 3291 3292 3293
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3294
  if (pBlockScanInfo->iiter.hasVal) {
3295 3296 3297 3298 3299 3300
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3301
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3302
    TSDBKEY k = TSDBROW_KEY(pRow);
3303
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3304

3305
    int32_t code = TSDB_CODE_SUCCESS;
3306 3307
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3308
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3309
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3310
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3311
      }
3312
    } else {  // ik.ts == k.ts
3313
      *freeTSRow = true;
3314 3315 3316 3317
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3318
    }
3319

3320
    return code;
H
Haojun Liao 已提交
3321 3322
  }

3323
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3324 3325
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3326 3327
  }

3328
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3329
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3330 3331 3332 3333 3334
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3335
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3336 3337 3338
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3339
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3340
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3341

3342
  SColVal colVal = {0};
3343
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3344

3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3356
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3357 3358 3359 3360 3361 3362 3363 3364
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3365
    }
3366 3367
  }

3368
  // set null value since current column does not exist in the "pSchema"
3369
  while (i < numOfCols) {
3370 3371 3372 3373 3374
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3375 3376 3377 3378
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3379 3380
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3381 3382 3383 3384 3385 3386 3387 3388
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3389
    i += 1;
3390 3391 3392
  }

  SColVal cv = {0};
3393 3394
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3395

3396
  while (i < numOfOutputCols && j < numOfInputCols) {
3397
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3398
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3399

3400 3401 3402 3403 3404
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3405
    if (pData->cid == pCol->info.colId) {
3406 3407
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3408
      j += 1;
H
Haojun Liao 已提交
3409 3410
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3411 3412 3413 3414 3415 3416 3417 3418
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3419
    colDataAppendNULL(pCol, outputRowIndex);
3420 3421 3422 3423 3424 3425 3426
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3427 3428
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3429 3430 3431 3432
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3433
    bool    freeTSRow = false;
3434
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3435 3436
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3437 3438
    }

H
Haojun Liao 已提交
3439
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3440 3441 3442
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3443 3444

    // no data in buffer, return immediately
3445
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3446 3447 3448
      break;
    }

3449
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3450 3451 3452 3453
      break;
    }
  } while (1);

3454
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3455 3456
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3457

3458 3459
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3460
  ASSERT(pReader != NULL);
3461 3462 3463 3464 3465 3466

  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
  }

3467 3468
  taosHashClear(pReader->status.pTableMap);

3469 3470 3471 3472 3473 3474
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
    taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
  }

H
Hongze Cheng 已提交
3475 3476 3477
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3478 3479 3480 3481 3482 3483
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3484

dengyihao's avatar
dengyihao 已提交
3485 3486 3487 3488 3489 3490
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3491

H
Hongze Cheng 已提交
3492
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3493

3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3509
// ====================================== EXPOSED APIs ======================================
3510 3511
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3512 3513 3514 3515 3516 3517
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3518 3519
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3520 3521
    goto _err;
  }
H
Hongze Cheng 已提交
3522

3523
  // check for query time window
H
Haojun Liao 已提交
3524
  STsdbReader* pReader = *ppReader;
3525
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3526 3527 3528
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3529

3530 3531
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3532
    int32_t order = pCond->order;
3533
    if (order == TSDB_ORDER_ASC) {
3534
      pCond->twindows.ekey = window.skey;
3535 3536 3537
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3538
      pCond->twindows.skey = window.ekey;
3539 3540 3541 3542
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3543
    // here we only need one more row, so the capacity is set to be ONE.
3544 3545 3546 3547 3548 3549
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3550
      pCond->twindows.skey = window.ekey;
3551
      pCond->twindows.ekey = INT64_MAX;
3552
    } else {
3553
      pCond->twindows.skey = INT64_MIN;
3554
      pCond->twindows.ekey = window.ekey;
3555
    }
3556 3557
    pCond->order = order;

3558 3559 3560 3561 3562 3563
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3564
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3565
  if (pCond->suid != 0) {
3566
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3567
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3568
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3569
    }
3570 3571
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3572
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3573
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3574
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3575
    }
3576 3577
  }

H
Hongze Cheng 已提交
3578
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3579

3580
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3581 3582 3583
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3584

H
Haojun Liao 已提交
3585 3586 3587
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3588

3589
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3590
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3591 3592 3593
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3594

3595
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3596 3597 3598
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3599
      }
3600
    } else {
H
Haojun Liao 已提交
3601 3602
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3603

H
Haojun Liao 已提交
3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3616

H
Haojun Liao 已提交
3617
      code = doOpenReaderImpl(pPrevReader);
3618
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3619
        return code;
3620
      }
3621 3622 3623
    }
  }

3624
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3625
  return code;
H
Hongze Cheng 已提交
3626

3627
  _err:
H
Haojun Liao 已提交
3628
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3629
  return code;
H
refact  
Hongze Cheng 已提交
3630 3631 3632
}

void tsdbReaderClose(STsdbReader* pReader) {
3633 3634
  if (pReader == NULL) {
    return;
3635
  }
H
refact  
Hongze Cheng 已提交
3636

3637 3638
  {
    if (pReader->innerReader[0] != NULL) {
3639
      STsdbReader* p = pReader->innerReader[0];
3640

3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3652 3653 3654 3655 3656 3657

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3658
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3659

3660 3661 3662 3663
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3664
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3665 3666 3667 3668
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3669

3670
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3671
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3672 3673

  cleanupDataBlockIterator(&pReader->status.blockIter);
3674 3675

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3676
  destroyBlockScanInfo(pReader->status.pTableMap);
3677
  blockDataDestroy(pReader->pResBlock);
3678

H
Haojun Liao 已提交
3679 3680 3681
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3682

H
Haojun Liao 已提交
3683
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3684

3685
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3686
  SIOCostSummary* pCost = &pReader->cost;
3687

H
Haojun Liao 已提交
3688 3689
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3690 3691
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3692

H
Haojun Liao 已提交
3693 3694 3695 3696 3697
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3698

H
Haojun Liao 已提交
3699 3700
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
H
Hongze Cheng 已提交
3701 3702
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
H
Haojun Liao 已提交
3703 3704 3705 3706 3707
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3708

3709 3710
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3711 3712 3713
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3714
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3715 3716
}

3717
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3718
  // cleanup the data that belongs to the previous data block
3719 3720
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3721

3722
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3723

3724 3725 3726 3727 3728
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3729

3730 3731 3732
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3733
      buildBlockFromBufferSequentially(pReader);
3734
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3735
    }
3736 3737 3738
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3739
  }
3740

3741
  return false;
H
refact  
Hongze Cheng 已提交
3742 3743
}

3744 3745 3746 3747 3748
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3749
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3750
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3751
    pReader->step = EXTERNAL_ROWS_PREV;
3752 3753 3754
    if (ret) {
      return ret;
    }
3755
  }
3756

3757
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3758 3759 3760 3761 3762 3763 3764 3765
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
    resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3766
    pReader->step = EXTERNAL_ROWS_MAIN;
3767 3768 3769 3770 3771 3772 3773
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3774
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3775 3776
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3777
    resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3778 3779 3780 3781
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3782
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3783
    pReader->step = EXTERNAL_ROWS_NEXT;
3784 3785 3786 3787 3788 3789 3790 3791
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3792
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Haojun Liao 已提交
3793
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3794
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3795 3796 3797
    return false;
  }

H
Haojun Liao 已提交
3798
  return true;
3799 3800
}

H
Haojun Liao 已提交
3801 3802 3803 3804 3805
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  ASSERT(pReader != NULL);
  *rows = pReader->pResBlock->info.rows;
  *uid = pReader->pResBlock->info.uid;
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3806 3807
}

H
Haojun Liao 已提交
3808
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
3809
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3810
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
3811
      setBlockInfo(pReader, rows, uid, pWindow);
3812
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
3813
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
3814
    } else {
H
Haojun Liao 已提交
3815
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
3816 3817
    }
  } else {
H
Haojun Liao 已提交
3818
    setBlockInfo(pReader, rows, uid, pWindow);
3819 3820 3821
  }
}

3822
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3823
  int32_t code = 0;
3824
  *allHave = false;
H
Hongze Cheng 已提交
3825

3826
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3827 3828 3829 3830
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3831
  // there is no statistics data for composed block
3832 3833 3834 3835
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3836

3837
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3838

H
Hongze Cheng 已提交
3839
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
3840
//  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3841

3842 3843
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3844
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3845
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3846
    if (code != TSDB_CODE_SUCCESS) {
3847 3848
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3849 3850
      return code;
    }
3851 3852 3853
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3854
  }
H
Hongze Cheng 已提交
3855

3856
  *allHave = true;
H
Hongze Cheng 已提交
3857

3858 3859
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3860

3861 3862
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3863 3864 3865 3866 3867 3868 3869 3870
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
3871 3872 3873
  size_t size = taosArrayGetSize(pSup->pColAgg);

  while (j < numOfCols && i < size) {
3874 3875 3876 3877 3878 3879 3880
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3881 3882
      i += 1;
      j += 1;
3883 3884 3885 3886 3887 3888 3889
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3890
  pReader->cost.smaDataLoad += 1;
3891 3892
  *pBlockStatis = pSup->plist;

3893
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
3894

H
Hongze Cheng 已提交
3895
  return code;
H
Hongze Cheng 已提交
3896 3897
}

3898
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3899 3900 3901
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3902
    return pReader->pResBlock->pDataBlock;
3903
  }
3904

H
Haojun Liao 已提交
3905 3906 3907 3908 3909 3910 3911 3912
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3913

3914
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3915
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3916
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3917 3918
    terrno = code;
    return NULL;
3919
  }
3920 3921 3922

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3923 3924
}

3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3937
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
3938
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
3939 3940
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3941

L
Liu Jicong 已提交
3942
  pReader->order = pCond->order;
3943
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3944
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3945
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3946
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3947

3948
  // allocate buffer in order to load data blocks from file
3949
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3950 3951
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3952
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3953
  tsdbDataFReaderClose(&pReader->pFileReader);
3954

3955
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3956

3957
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3958
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3959

H
Hongze Cheng 已提交
3960
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
3961
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3962

3963
  int32_t         code = 0;
3964 3965
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3966 3967 3968 3969 3970 3971
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3972 3973
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3974 3975 3976
      return code;
    }
  }
H
Hongze Cheng 已提交
3977

H
Hongze Cheng 已提交
3978
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
3979
                " in query %s",
H
Hongze Cheng 已提交
3980 3981
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
3982

3983
  return code;
H
Hongze Cheng 已提交
3984
}
H
Hongze Cheng 已提交
3985

3986 3987 3988
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3989

3990 3991 3992 3993
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3994

3995 3996
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3997

3998 3999 4000
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4001

4002
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4003

4004
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4005

4006 4007
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4008

4009 4010
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4011

4012 4013
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4014
  }
H
Hongze Cheng 已提交
4015

4016
  pTableBlockInfo->numOfTables = numOfTables;
4017
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4018

4019 4020
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4021
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4022

4023 4024
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4025

4026 4027 4028
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4029

4030 4031 4032
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4033

4034 4035 4036
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4037

4038 4039
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4040

H
Haojun Liao 已提交
4041
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4042 4043 4044 4045 4046
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4047

4048 4049
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4050
    }
H
refact  
Hongze Cheng 已提交
4051

H
Hongze Cheng 已提交
4052 4053
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4054
  }
H
Hongze Cheng 已提交
4055

H
refact  
Hongze Cheng 已提交
4056 4057
  return code;
}
H
Hongze Cheng 已提交
4058

H
refact  
Hongze Cheng 已提交
4059
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4060
  int64_t rows = 0;
H
Hongze Cheng 已提交
4061

4062 4063
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4064

4065 4066 4067 4068 4069
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4070
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4071 4072 4073 4074 4075 4076 4077
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4078
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4079 4080 4081 4082 4083 4084 4085 4086
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4087

H
refact  
Hongze Cheng 已提交
4088
  return rows;
H
Hongze Cheng 已提交
4089
}
D
dapan1121 已提交
4090

L
Liu Jicong 已提交
4091
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4104

D
dapan1121 已提交
4105
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4106
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4121
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4122

D
dapan1121 已提交
4123 4124
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4125

H
Haojun Liao 已提交
4126
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4155
  // fs
H
Hongze Cheng 已提交
4156 4157 4158 4159 4160
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4161 4162 4163 4164 4165 4166 4167 4168

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4169
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4170
  _exit:
H
Hongze Cheng 已提交
4171 4172 4173
  return code;
}

H
Haojun Liao 已提交
4174
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4175 4176 4177 4178 4179 4180 4181 4182 4183
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4184
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4185
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4186
  }
H
Haojun Liao 已提交
4187 4188
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}