tsdbRead.c 151.5 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

38
typedef struct SBlockIndex {
39 40
  int32_t     ordinalIndex;
  int64_t     inFileOffset;
H
Haojun Liao 已提交
41
  STimeWindow window;  // todo replace it with overlap flag.
42 43
} SBlockIndex;

H
Haojun Liao 已提交
44
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
45 46
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
47
  SMapData  mapData;            // block info (compressed)
48
  SArray*   pBlockList;         // block data index list, SArray<SBlockIndex>
H
Hongze Cheng 已提交
49 50 51 52 53 54
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
55 56 57
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
58
  int64_t uid;
59
  int64_t offset;
H
Haojun Liao 已提交
60
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
61 62

typedef struct SBlockOrderSupporter {
63 64 65 66
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
67 68 69
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
70 71 72
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
73
  int64_t headFileLoad;
74
  double  headFileLoadTime;
75
  int64_t smaDataLoad;
76
  double  smaLoadTime;
77 78
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
79 80
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Haojun Liao 已提交
81
  double  createScanInfoList;
H
Hongze Cheng 已提交
82 83 84
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
85
  SArray*          pColAgg;
86
  SColumnDataAgg   tsColAgg;
H
Haojun Liao 已提交
87 88
  int16_t*         colId;
  int16_t*         slotId;
89
  int32_t          numOfCols;
90
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
91
  bool             smaValid;  // the sma on all queried columns are activated
H
Hongze Cheng 已提交
92 93
} SBlockLoadSuppInfo;

94
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
95 96 97 98 99
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
100
  SSttBlockLoadInfo* pInfo;
101 102
} SLastBlockReader;

103
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
104 105 106
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
107
  int32_t           order;
H
Hongze Cheng 已提交
108
  SLastBlockReader* pLastBlockReader;  // last file block reader
109
} SFilesetIter;
H
Haojun Liao 已提交
110 111

typedef struct SFileDataBlockInfo {
112
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
113
  uint64_t uid;
114
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
115 116 117
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
118
  int32_t   numOfBlocks;
119
  int32_t   index;
H
Hongze Cheng 已提交
120
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
121
  int32_t   order;
122
  SDataBlk  block;  // current SDataBlk data
123
  SHashObj* pTableMap;
H
Haojun Liao 已提交
124 125 126
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
127 128 129 130
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
131 132
} SFileBlockDumpInfo;

133
typedef struct SUidOrderCheckInfo {
134 135
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
136 137
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
138
typedef struct SReaderStatus {
H
Hongze Cheng 已提交
139 140 141
  bool                  loadFromFile;       // check file stage
  bool                  composedDataBlock;  // the returned data block is a composed block or not
  SHashObj*             pTableMap;          // SHash<STableBlockScanInfo>
142
  STableBlockScanInfo** pTableIter;         // table iterator used in building in-memory buffer data blocks.
H
Hongze Cheng 已提交
143 144 145 146 147 148
  SUidOrderCheckInfo    uidCheckInfo;       // check all table in uid order
  SFileBlockDumpInfo    fBlockDumpInfo;
  SDFileSet*            pCurrentFileset;  // current opened file set
  SBlockData            fileBlockData;
  SFilesetIter          fileIter;
  SDataBlockIter        blockIter;
H
Haojun Liao 已提交
149 150
} SReaderStatus;

151
typedef struct SBlockInfoBuf {
H
Hongze Cheng 已提交
152 153 154
  int32_t currentIndex;
  SArray* pData;
  int32_t numPerBucket;
155 156
} SBlockInfoBuf;

H
Hongze Cheng 已提交
157
struct STsdbReader {
H
Haojun Liao 已提交
158 159 160
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
H
Haojun Liao 已提交
161
  bool               freeBlock;
H
Haojun Liao 已提交
162 163 164 165
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
166 167
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
168
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
169
  STsdbReadSnap*     pReadSnap;
170
  SIOCostSummary     cost;
171 172
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
173 174 175
  SDataFReader*      pFileReader; // the file reader
  SDelFReader*       pDelFReader; // the del file reader
  SArray*            pDelIdx;     // del file block index;
176
  SVersionRange      verRange;
177 178 179
  SBlockInfoBuf      blockInfoBuf;
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
180
};
H
Hongze Cheng 已提交
181

H
Haojun Liao 已提交
182
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
183 184
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
185
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
186 187
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
188
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
189
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
190
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
191
                                 STsdbReader* pReader);
H
Hongze Cheng 已提交
192
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
H
Haojun Liao 已提交
193
                                     STableBlockScanInfo* pScanInfo);
194
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
195
                                         int32_t rowIndex);
196
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
H
Hongze Cheng 已提交
197 198
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
                               SVersionRange* pVerRange);
199

H
Hongze Cheng 已提交
200 201 202 203
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
204 205
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
206

dengyihao's avatar
dengyihao 已提交
207 208 209 210
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
211
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
212 213 214
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
215
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
216
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
217
static void          initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
218
static int32_t       getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
H
Haojun Liao 已提交
219

220 221
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

H
Haojun Liao 已提交
222
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
223
  pSupInfo->smaValid = true;
224
  pSupInfo->numOfCols = numOfCols;
H
Haojun Liao 已提交
225 226 227
  pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES));
  if (pSupInfo->colId == NULL) {
    taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
228 229
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
230

H
Haojun Liao 已提交
231 232
  pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
  pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
H
Haojun Liao 已提交
233
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
234 235
    pSupInfo->colId[i] = pCols[i].colId;
    pSupInfo->slotId[i] = pSlotIdList[i];
236

H
Haojun Liao 已提交
237 238
    if (IS_VAR_DATA_TYPE(pCols[i].type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes);
H
Haojun Liao 已提交
239 240
    } else {
      pSupInfo->buildBuf[i] = NULL;
241
    }
H
Haojun Liao 已提交
242
  }
H
Hongze Cheng 已提交
243

H
Haojun Liao 已提交
244 245
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
246

H
Haojun Liao 已提交
247
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
248 249 250 251
  int32_t i = 0, j = 0;

  while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
    STColumn* pTCol = &pSchema->columns[i];
H
Haojun Liao 已提交
252
    if (pTCol->colId == pSupInfo->colId[j]) {
253 254
      if (!IS_BSMA_ON(pTCol)) {
        pSupInfo->smaValid = false;
H
Haojun Liao 已提交
255
        return TSDB_CODE_SUCCESS;
256 257 258 259
      }

      i += 1;
      j += 1;
H
Haojun Liao 已提交
260
    } else if (pTCol->colId < pSupInfo->colId[j]) {
261 262 263
      // do nothing
      i += 1;
    } else {
H
Haojun Liao 已提交
264
      return TSDB_CODE_INVALID_PARA;
265 266
    }
  }
H
Haojun Liao 已提交
267 268

  return TSDB_CODE_SUCCESS;
269 270
}

271
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
H
Hongze Cheng 已提交
272
  int32_t num = numOfTables / pBuf->numPerBucket;
273 274 275 276 277
  int32_t remainder = numOfTables % pBuf->numPerBucket;
  if (pBuf->pData == NULL) {
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
  }

H
Hongze Cheng 已提交
278
  for (int32_t i = 0; i < num; ++i) {
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
    char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    taosArrayPush(pBuf->pData, &p);
  }

  if (remainder > 0) {
    char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
    if (p == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(pBuf->pData, &p);
  }

  return TSDB_CODE_SUCCESS;
}

static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
  size_t num = taosArrayGetSize(pBuf->pData);
H
Hongze Cheng 已提交
300
  for (int32_t i = 0; i < num; ++i) {
301 302 303 304 305 306 307 308 309
    char** p = taosArrayGet(pBuf->pData, i);
    taosMemoryFree(*p);
  }

  taosArrayDestroy(pBuf->pData);
}

static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
  int32_t bucketIndex = index / pBuf->numPerBucket;
H
Hongze Cheng 已提交
310
  char**  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
311 312 313 314
  return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}

// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
H
Haojun Liao 已提交
315
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
316
  // allocate buffer in order to load data blocks from file
317
  // todo use simple hash instead, optimize the memory consumption
318 319 320
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
321 322 323
    return NULL;
  }

H
Haojun Liao 已提交
324
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
325
  initBlockScanInfoBuf(pBuf, numOfTables);
H
Haojun Liao 已提交
326

327
  for (int32_t j = 0; j < numOfTables; ++j) {
H
Haojun Liao 已提交
328
    STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
329 330 331 332 333 334 335 336 337 338
    pScanInfo->uid = idList[j].uid;
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      int64_t skey = pTsdbReader->window.skey;
      pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
    } else {
      int64_t ekey = pTsdbReader->window.ekey;
      pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
    }

    taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
H
Hongze Cheng 已提交
339 340
    tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
              pScanInfo->lastKey, pTsdbReader->idStr);
H
Haojun Liao 已提交
341 342
  }

H
Haojun Liao 已提交
343 344 345 346
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
            pTsdbReader->idStr);
347

348
  return pTableMap;
H
Hongze Cheng 已提交
349
}
H
Hongze Cheng 已提交
350

351 352
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
  STableBlockScanInfo** p = NULL;
dengyihao's avatar
dengyihao 已提交
353
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
H
Hongze Cheng 已提交
354
    STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
355 356

    pInfo->iterInit = false;
H
Haojun Liao 已提交
357
    pInfo->iter.hasVal = false;
358
    pInfo->iiter.hasVal = false;
H
Haojun Liao 已提交
359

360 361
    if (pInfo->iter.iter != NULL) {
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
362 363
    }

H
Haojun Liao 已提交
364 365 366 367
    if (pInfo->iiter.iter != NULL) {
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
    }

368 369
    pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
    pInfo->lastKey = ts;
370 371 372
  }
}

373 374
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
H
Haojun Liao 已提交
375 376

  p->iter.hasVal = false;
377
  p->iiter.hasVal = false;
378

379 380 381
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
382

383 384 385
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
386

387 388 389 390
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
391

H
Haojun Liao 已提交
392
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
393
  void* p = NULL;
H
Haojun Liao 已提交
394
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
395
    clearBlockScanInfo(*(STableBlockScanInfo**)p);
396 397 398 399 400
  }

  taosHashCleanup(pTableMap);
}

401
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
402
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
403
}
H
Hongze Cheng 已提交
404

405 406 407
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
408
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
409

410
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
411
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
412

dengyihao's avatar
dengyihao 已提交
413
  STimeWindow win = *pWindow;
414 415 416 417 418 419
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
420

H
Haojun Liao 已提交
421
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
422 423 424 425 426 427
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
428 429 430
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
431 432 433 434
  }
}

// init file iterator
435
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
436
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
437

438 439
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
440
  pIter->pFileList = aDFileSet;
441
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
442

443 444 445 446
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
447
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
448 449
      return code;
    }
450 451
  }

452 453 454 455 456 457 458 459
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

460
  if (pLReader->pInfo == NULL) {
461
    // here we ignore the first column, which is always be the primary timestamp column
462
    pLReader->pInfo =
H
Haojun Liao 已提交
463
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
464 465 466 467
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
468 469
  }

470
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
471 472 473
  return TSDB_CODE_SUCCESS;
}

474
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
475 476
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
477 478 479
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
480 481 482
    return false;
  }

H
Haojun Liao 已提交
483 484 485
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

486 487
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
488
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
489

H
Haojun Liao 已提交
490 491
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
492

493
  while (1) {
H
Haojun Liao 已提交
494 495 496
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
497

498
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
499

500 501 502 503
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
504

505 506
    pReader->cost.headFileLoad += 1;

507 508 509 510 511 512 513 514 515 516 517 518
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
519 520 521
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
522 523
      continue;
    }
C
Cary Xu 已提交
524

525
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
526
              pReader->window.ekey, pReader->idStr);
527 528
    return true;
  }
529

H
Haojun Liao 已提交
530
_err:
H
Haojun Liao 已提交
531 532 533
  return false;
}

534
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
535 536
  pIter->order = order;
  pIter->index = -1;
537
  pIter->numOfBlocks = 0;
538 539 540 541 542 543 544
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
545
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
546

H
Haojun Liao 已提交
547
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
548 549
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
550 551
}

552 553 554 555 556 557 558 559
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
560
    SColumnInfoData colInfo = {0};
561 562 563 564 565 566 567 568 569 570 571 572 573
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }
  return pResBlock;
}

574
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
H
Haojun Liao 已提交
575
                                SSDataBlock* pResBlock, const char* idstr) {
H
Haojun Liao 已提交
576
  int32_t      code = 0;
577
  int8_t       level = 0;
H
Haojun Liao 已提交
578
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
579 580
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
581
    goto _end;
H
Hongze Cheng 已提交
582 583
  }

C
Cary Xu 已提交
584
  if (VND_IS_TSMA(pVnode)) {
H
Haojun Liao 已提交
585
    tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr);
C
Cary Xu 已提交
586 587
  }

H
Haojun Liao 已提交
588
  initReaderStatus(&pReader->status);
589

L
Liu Jicong 已提交
590
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
591 592
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
593
  pReader->capacity = capacity;
H
Haojun Liao 已提交
594
  pReader->pResBlock = pResBlock;
dengyihao's avatar
dengyihao 已提交
595 596
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
597
  pReader->type = pCond->type;
598
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
599
  pReader->blockInfoBuf.numPerBucket = 1000;  // 1000 tables per bucket
H
Hongze Cheng 已提交
600

H
Haojun Liao 已提交
601 602 603 604 605 606 607 608 609
  if (pReader->pResBlock == NULL) {
    pReader->freeBlock = true;
    pReader->pResBlock = createResBlock(pCond, pReader->capacity);
    if (pReader->pResBlock == NULL) {
      code = terrno;
      goto _end;
    }
  }

H
Haojun Liao 已提交
610 611 612 613 614 615
  if (pCond->numOfCols <= 0) {
    tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr);
    code = TSDB_CODE_INVALID_PARA;
    goto _end;
  }

H
Haojun Liao 已提交
616
  // todo refactor.
617
  limitOutputBufferSize(pCond, &pReader->capacity);
618

619 620
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
621
  pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
H
Haojun Liao 已提交
622
  if (pSup->pColAgg == NULL) {
623 624 625
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
626

627 628
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
629
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
630 631 632 633 634
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

H
Haojun Liao 已提交
635
  setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
636

H
Hongze Cheng 已提交
637 638
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
639

H
Haojun Liao 已提交
640
_end:
H
Haojun Liao 已提交
641
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
642 643 644
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
645

H
Haojun Liao 已提交
646
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
647
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
648

649
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
650
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
651
  if (code != TSDB_CODE_SUCCESS) {
652
    goto _end;
H
Haojun Liao 已提交
653
  }
H
Hongze Cheng 已提交
654

655 656
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
657
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
658 659
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
660

661 662 663 664
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
665
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
666

667
    // uid check
H
Hongze Cheng 已提交
668
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
669 670 671 672
      continue;
    }

    // this block belongs to a table that is not queried.
H
Haojun Liao 已提交
673
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
674 675 676 677
    if (p == NULL) {
      continue;
    }

H
Haojun Liao 已提交
678
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
H
Haojun Liao 已提交
679
    if (pScanInfo->pBlockList == NULL) {
680
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
H
Haojun Liao 已提交
681 682
    }

H
Hongze Cheng 已提交
683
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
684
  }
H
Hongze Cheng 已提交
685

686
  int64_t et2 = taosGetTimestampUs();
687
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
688
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
689 690 691

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
692
_end:
H
Hongze Cheng 已提交
693
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
694 695
  return code;
}
H
Hongze Cheng 已提交
696

697
static void cleanupTableScanInfo(SHashObj* pTableMap) {
698
  STableBlockScanInfo** px = NULL;
dengyihao's avatar
dengyihao 已提交
699
  while (1) {
700
    px = taosHashIterate(pTableMap, px);
701 702 703 704
    if (px == NULL) {
      break;
    }

705
    // reset the index in last block when handing a new file
706 707
    tMapDataClear(&(*px)->mapData);
    taosArrayClear((*px)->pBlockList);
708
  }
709 710
}

711
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
712 713 714 715 716 717
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
718

dengyihao's avatar
dengyihao 已提交
719
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
720
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
721

H
Haojun Liao 已提交
722 723
    STableBlockScanInfo* pScanInfo =
        *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
724

725
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
726
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
H
Haojun Liao 已提交
727
    taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
728

729
    sizeInDisk += pScanInfo->mapData.nData;
H
Haojun Liao 已提交
730

H
Haojun Liao 已提交
731
    SDataBlk block = {0};
732
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
733
      tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
H
Hongze Cheng 已提交
734

735
      // 1. time range check
H
Haojun Liao 已提交
736
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
737 738
        continue;
      }
H
Hongze Cheng 已提交
739

740
      // 2. version range check
H
Haojun Liao 已提交
741
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
742 743
        continue;
      }
744

H
Haojun Liao 已提交
745 746
      SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
      bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
747

H
Haojun Liao 已提交
748 749
      void* p1 = taosArrayPush(pScanInfo->pBlockList, &bIndex);
      if (p1 == NULL) {
750
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
751 752
        return TSDB_CODE_OUT_OF_MEMORY;
      }
753

754
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
755
    }
H
Hongze Cheng 已提交
756

H
Haojun Liao 已提交
757
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
758 759 760 761
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
762
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
763
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
764

765
  double el = (taosGetTimestampUs() - st) / 1000.0;
766 767 768 769 770
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
771

H
Haojun Liao 已提交
772

773
  pReader->cost.numOfBlocks += total;
774
  pReader->cost.headFileLoadTime += el;
775

H
Haojun Liao 已提交
776 777
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
778

779
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
780
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
781
  pDumpInfo->allDumped = true;
782
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
783 784
}

785 786
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
787
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
788
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
789 790 791
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
792
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
793 794 795 796
      if (pColVal->value.nData > 0) {  // pData may be null, if nData is 0
        memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      }

H
Haojun Liao 已提交
797 798 799
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
800
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
801
  }
H
Haojun Liao 已提交
802 803
}

804
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
H
Haojun Liao 已提交
805 806 807
  size_t num = taosArrayGetSize(pBlockIter->blockList);
  if (num == 0) {
    ASSERT(pBlockIter->numOfBlocks == num);
808 809
    return NULL;
  }
810 811 812

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
813 814
}

H
Hongze Cheng 已提交
815
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
816

H
Haojun Liao 已提交
817 818 819 820 821 822
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
823
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
824 825 826 827
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
828 829
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
830 831
    while (1) {
      // check can return
H
Hongze Cheng 已提交
832 833 834
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
835 836

      // change start or end position
H
Hongze Cheng 已提交
837
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
838 839
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
840
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
841 842 843 844
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
845
  } else {  // DESC
H
Haojun Liao 已提交
846
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
847 848
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
849 850
    while (1) {
      // check can return
H
Hongze Cheng 已提交
851 852 853
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
854 855

      // change start or end position
H
Hongze Cheng 已提交
856
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
857 858
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
859
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
860 861 862 863 864 865 866
        s = mid;
      else
        return mid;
    }
  }
}

H
Haojun Liao 已提交
867
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
H
Haojun Liao 已提交
868 869
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
870
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
871 872 873 874 875 876

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
877 878
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
879 880 881 882 883
  }

  return endPos;
}

H
Haojun Liao 已提交
884
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
H
Haojun Liao 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
                             int32_t dumpedRows, bool asc) {
  if (asc) {
    memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));

    // todo: opt perf by extract the loop
    // reverse the array list
    int32_t  mid = dumpedRows >> 1u;
    int64_t* pts = (int64_t*)pColData->pData;
    for (int32_t j = 0; j < mid; ++j) {
      int64_t t = pts[j];
      pts[j] = pts[dumpedRows - j - 1];
      pts[dumpedRows - j - 1] = t;
    }
  }
}

H
Haojun Liao 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916
// a faster version of copy procedure.
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
                            int32_t dumpedRows, bool asc)  {
  uint8_t* p = NULL;
  if (asc) {
    p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
  } else {
    int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
    p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
  }

  int32_t step = asc? 1:-1;

H
Haojun Liao 已提交
917 918
  // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
//  ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
H
Haojun Liao 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946

  // 1. copy data in a batch model
  memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);

  // 2. reverse the array list in case of descending order scan data block
  if (!asc) {
    switch(pColData->info.type) {
      case TSDB_DATA_TYPE_TIMESTAMP:
      case TSDB_DATA_TYPE_DOUBLE:
      case TSDB_DATA_TYPE_BIGINT:
      case TSDB_DATA_TYPE_UBIGINT:
      {
        int32_t  mid = dumpedRows >> 1u;
        int64_t* pts = (int64_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_BOOL:
      case TSDB_DATA_TYPE_TINYINT:
      case TSDB_DATA_TYPE_UTINYINT: {
        int32_t  mid = dumpedRows >> 1u;
        int8_t* pts = (int8_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
947
          int8_t t = pts[j];
H
Haojun Liao 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_SMALLINT:
      case TSDB_DATA_TYPE_USMALLINT: {
        int32_t  mid = dumpedRows >> 1u;
        int16_t* pts = (int16_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
          int64_t t = pts[j];
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }

      case TSDB_DATA_TYPE_FLOAT:
      case TSDB_DATA_TYPE_INT:
      case TSDB_DATA_TYPE_UINT: {
        int32_t  mid = dumpedRows >> 1u;
        int32_t* pts = (int32_t*)pColData->pData;
        for (int32_t j = 0; j < mid; ++j) {
H
Haojun Liao 已提交
972
          int32_t t = pts[j];
H
Haojun Liao 已提交
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994
          pts[j] = pts[dumpedRows - j - 1];
          pts[dumpedRows - j - 1] = t;
        }
        break;
      }
    }
  }

  // 3. if the  null value exists, check items one-by-one
  if (pData->flag != HAS_VALUE) {
    int32_t rowIndex = 0;

    for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
      uint8_t v = tColDataGetBitValue(pData, j);
      if (v == 0 || v == 1) {
        colDataSetNull_f(pColData->nullbitmap, rowIndex);
        pColData->hasNull = true;
      }
    }
  }
}

995
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
H
Haojun Liao 已提交
996 997 998 999
  SReaderStatus*      pStatus = &pReader->status;
  SDataBlockIter*     pBlockIter = &pStatus->blockIter;
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Hongze Cheng 已提交
1000

1001
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
1002
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
1003
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
1004
  SSDataBlock*        pResBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1005
  int32_t             numOfOutputCols = pSupInfo->numOfCols;
H
Haojun Liao 已提交
1006

H
Haojun Liao 已提交
1007
  SColVal cv = {0};
1008
  int64_t st = taosGetTimestampUs();
1009 1010
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
1011

1012 1013
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
1014 1015 1016
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
H
Haojun Liao 已提交
1017
    } else {  // find the appropriate the start position in current block, and set it to be the current rowIndex
1018
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
1019 1020 1021
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
H
Haojun Liao 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030

      if (pDumpInfo->rowIndex < 0) {
        tsdbError(
            "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
            "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
            pBlock->maxVer, pReader->idStr);
        return TSDB_CODE_INVALID_PARA;
      }
1031
    }
H
Haojun Liao 已提交
1032 1033 1034
  }

  // time window check
1035 1036 1037 1038 1039 1040 1041
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
1042 1043 1044
  int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
  if (dumpedRows > pReader->capacity) {  // output buffer check
    dumpedRows = pReader->capacity;
1045 1046
  }

H
Haojun Liao 已提交
1047
  int32_t i = 0;
H
Haojun Liao 已提交
1048 1049
  int32_t rowIndex = 0;

H
Haojun Liao 已提交
1050 1051
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
  if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1052
    copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
1053 1054 1055
    i += 1;
  }

1056
  int32_t colIndex = 0;
H
Hongze Cheng 已提交
1057
  int32_t num = pBlockData->nColData;
1058
  while (i < numOfOutputCols && colIndex < num) {
1059 1060
    rowIndex = 0;

H
Hongze Cheng 已提交
1061
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
H
Haojun Liao 已提交
1062
    if (pData->cid < pSupInfo->colId[i]) {
1063
      colIndex += 1;
H
Haojun Liao 已提交
1064 1065
    } else if (pData->cid == pSupInfo->colId[i]) {
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1066

H
Hongze Cheng 已提交
1067
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
1068
        colDataAppendNNULL(pColData, 0, dumpedRows);
H
Haojun Liao 已提交
1069
      } else {
H
Haojun Liao 已提交
1070
        if (IS_MATHABLE_TYPE(pColData->info.type)) {
H
Haojun Liao 已提交
1071 1072
          copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
        } else {  // varchar/nchar type
H
Haojun Liao 已提交
1073
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
H
Haojun Liao 已提交
1074 1075 1076 1077
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
1078
      }
H
Haojun Liao 已提交
1079

1080
      colIndex += 1;
1081
      i += 1;
1082
    } else {  // the specified column does not exist in file block, fill with null data
H
Haojun Liao 已提交
1083
      pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1084
      colDataAppendNNULL(pColData, 0, dumpedRows);
1085
      i += 1;
H
Haojun Liao 已提交
1086
    }
1087 1088
  }

1089
  // fill the mis-matched columns with null value
1090
  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
1091
    pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
1092
    colDataAppendNNULL(pColData, 0, dumpedRows);
1093
    i += 1;
H
Haojun Liao 已提交
1094
  }
H
Haojun Liao 已提交
1095

1096
  pResBlock->info.dataLoad = 1;
H
Haojun Liao 已提交
1097 1098
  pResBlock->info.rows = dumpedRows;
  pDumpInfo->rowIndex += step * dumpedRows;
1099

1100
  // check if current block are all handled
1101
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
1102 1103 1104 1105
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
1106
  } else {
1107 1108
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
1109
  }
H
Haojun Liao 已提交
1110

1111
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1112
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
1113

1114
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
1115
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Haojun Liao 已提交
1116
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s",
H
Haojun Liao 已提交
1117
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
H
Haojun Liao 已提交
1118
            unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
1119 1120 1121 1122

  return TSDB_CODE_SUCCESS;
}

1123 1124
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
1125 1126
  int64_t st = taosGetTimestampUs();

1127 1128
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
1129
  int32_t code =
H
Haojun Liao 已提交
1130
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
1131 1132 1133 1134
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1135
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
1136
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1137

H
Hongze Cheng 已提交
1138
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
1139
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
1140 1141
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1142
              ", rows:%d, code:%s %s",
1143
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
1144 1145 1146
              tstrerror(code), pReader->idStr);
    return code;
  }
1147

1148
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
1149

1150
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
H
Hongze Cheng 已提交
1151
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
1152 1153
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
1154 1155 1156

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1157

H
Haojun Liao 已提交
1158
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1159
}
H
Hongze Cheng 已提交
1160

H
Haojun Liao 已提交
1161 1162 1163
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1164

H
Haojun Liao 已提交
1165 1166 1167 1168
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1169

H
Haojun Liao 已提交
1170 1171
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1172

H
Haojun Liao 已提交
1173 1174
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1175 1176
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1177

H
Haojun Liao 已提交
1178 1179 1180 1181
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1182

H
Haojun Liao 已提交
1183 1184
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1185

H
Haojun Liao 已提交
1186
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1187
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1188
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1189

H
Haojun Liao 已提交
1190
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1191

H
Haojun Liao 已提交
1192 1193
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1194

H
Haojun Liao 已提交
1195 1196 1197 1198 1199 1200 1201
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1202

1203
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1204
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1205

1206 1207 1208
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1209
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1210 1211
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
H
Haojun Liao 已提交
1212
    STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1213
    if (pScanInfo == NULL) {
1214
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1215 1216 1217
      return TSDB_CODE_INVALID_PARA;
    }

H
Haojun Liao 已提交
1218 1219
    SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx);
    tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
1220
  }
1221 1222 1223 1224 1225 1226

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1227
}
H
Hongze Cheng 已提交
1228

1229
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1230
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1231

1232
  SBlockOrderSupporter sup = {0};
1233
  pBlockIter->numOfBlocks = numOfBlocks;
1234
  taosArrayClear(pBlockIter->blockList);
1235
  pBlockIter->pTableMap = pReader->status.pTableMap;
1236

1237 1238
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1239

1240
  int64_t st = taosGetTimestampUs();
1241
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
1242 1243 1244
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1245

1246 1247 1248 1249 1250 1251 1252
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1253

1254
    STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
1255 1256 1257
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1258

1259 1260
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1261

1262 1263 1264
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1265
      return TSDB_CODE_OUT_OF_MEMORY;
1266
    }
H
Haojun Liao 已提交
1267

1268
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1269

1270 1271 1272
    for (int32_t k = 0; k < num; ++k) {
      SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      sup.pDataBlockInfo[sup.numOfTables][k] =
1273
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
1274 1275 1276 1277 1278
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1279

H
Haojun Liao 已提交
1280 1281 1282 1283
  if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_INVALID_PARA;
  }
H
Haojun Liao 已提交
1284

1285
  // since there is only one table qualified, blocks are not sorted
1286 1287
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1288 1289
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1290
    }
1291

1292
    int64_t et = taosGetTimestampUs();
1293
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1294
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1295

1296
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1297
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1298
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1299
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1300
  }
H
Haojun Liao 已提交
1301

1302 1303
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1304

1305
  SMultiwayMergeTreeInfo* pTree = NULL;
H
Haojun Liao 已提交
1306 1307

  uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
1308 1309
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
S
Shengliang Guan 已提交
1310
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1311
  }
H
Haojun Liao 已提交
1312

1313 1314 1315 1316
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1317

1318 1319
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1320

1321 1322 1323 1324
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1325

1326 1327
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1328
  }
H
Haojun Liao 已提交
1329

1330
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1331 1332
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1333 1334
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1335

1336
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1337
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1338

1339
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1340
}
H
Hongze Cheng 已提交
1341

H
Haojun Liao 已提交
1342
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1343 1344
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1345
  int32_t step = asc ? 1 : -1;
1346
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1347 1348 1349
    return false;
  }

1350
  pBlockIter->index += step;
H
Haojun Liao 已提交
1351
  doSetCurrentBlock(pBlockIter, idStr);
1352

1353 1354 1355
  return true;
}

1356 1357 1358
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1359
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1360 1361
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1362 1363
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1364
}
H
Hongze Cheng 已提交
1365

1366
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
1367
                                        int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
1368
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1369
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1370
    return false;
1371 1372
  }

H
Haojun Liao 已提交
1373
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1374
    return false;
1375 1376
  }

1377
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1378
  *nextIndex = pBlockInfo->tbBlockIdx + step;
H
Hongze Cheng 已提交
1379 1380
  *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  //  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
1381
  return true;
1382 1383 1384
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
1385
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1386 1387
  int32_t index = pBlockIter->index;

1388
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  return -1;
}

1400
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1401
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1402 1403 1404 1405
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1406 1407 1408 1409 1410
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1411

1412 1413 1414
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1415

H
Haojun Liao 已提交
1416
  doSetCurrentBlock(pBlockIter, "");
1417 1418 1419
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1420
// todo: this attribute could be acquired during extractin the global ordered block list.
1421
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
1422 1423
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
1424
    return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
1425
  } else {
1426
    return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
1427
  }
H
Haojun Liao 已提交
1428
}
H
Hongze Cheng 已提交
1429

H
Hongze Cheng 已提交
1430
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1431
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1432

1433
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1434
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1435
}
H
Hongze Cheng 已提交
1436

H
Hongze Cheng 已提交
1437
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1438 1439
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1440 1441
}

H
Hongze Cheng 已提交
1442 1443
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
                                       int32_t startIndex) {
1444 1445
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

1446
  for (int32_t i = startIndex; i < num; i += 1) {
1447 1448
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1449
      if (p->version >= pBlock->minVer) {
1450 1451 1452
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1453
      if (p->version >= pBlock->minVer) {
1454 1455
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1456 1457
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1471
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1472 1473 1474 1475
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1476
  // ts is not overlap
1477
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1478
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1479 1480 1481 1482 1483
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1484
  if (ASCENDING_TRAVERSE(order)) {
1485
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
1486 1487
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1488
    while (1) {
1489 1490 1491 1492 1493
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1494 1495 1496
      }
    }

1497
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
1498
  }
1499 1500
}

H
Haojun Liao 已提交
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
1514 1515
  int32_t     neighborIndex = 0;
  SBlockIndex bIndex = {0};
1516

1517
  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
1518

1519
  // overlap with neighbor
1520
  if (hasNeighbor) {
1521
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
1522 1523
  }

1524
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1525 1526
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1527

H
Haojun Liao 已提交
1528 1529 1530
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1531 1532
  }

H
Haojun Liao 已提交
1533 1534 1535 1536
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1537

H
Haojun Liao 已提交
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1552 1553 1554

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
1555
    tsdbDebug("%p uid:%" PRIu64 " need to load the datablock, overlapneighbor:%d, hasDup:%d, partiallyRequired:%d, "
H
Haojun Liao 已提交
1556
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1557 1558 1559
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1560 1561 1562
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1563 1564
}

H
Haojun Liao 已提交
1565 1566 1567 1568 1569 1570 1571 1572 1573
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1574
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1575
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1576 1577
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1578

1579 1580 1581
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1582
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1583

H
Haojun Liao 已提交
1584
  blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
H
Haojun Liao 已提交
1585
  pBlock->info.id.uid = pBlockScanInfo->uid;
1586

1587
  setComposedBlockFlag(pReader, true);
1588

1589
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1590
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
H
Hongze Cheng 已提交
1591
            " - %" PRId64 " %s",
1592 1593
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1594 1595

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1596 1597 1598
  return code;
}

1599 1600
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1601 1602 1603
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
1604 1605
  bool asc = (pReader->order == TSDB_ORDER_ASC);
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
1606
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1607 1608

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1609
    if (nextKey != key) {  // merge is not needed
1610
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1611 1612 1613 1614 1615 1616 1617 1618
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1619 1620
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1621 1622 1623 1624 1625 1626 1627 1628
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1629 1630
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1631 1632 1633 1634 1635 1636 1637
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1638
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1653 1654 1655
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1656
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1657 1658
  }

1659
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1660 1661 1662 1663 1664 1665
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1666 1667 1668 1669 1670 1671
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1672 1673 1674 1675 1676 1677
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1678
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1679
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1680
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1681 1682
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1683 1684
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1685
  }
H
Haojun Liao 已提交
1686 1687
}

1688
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1689 1690 1691 1692 1693 1694
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1695
  int64_t tsLast = INT64_MIN;
1696
  if (hasDataInLastBlock(pLastBlockReader)) {
1697 1698
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1699

H
Hongze Cheng 已提交
1700 1701
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1702

1703 1704
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1705
    minKey = INT64_MAX;  // chosen the minimum value
1706
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1707 1708
      minKey = tsLast;
    }
1709

1710 1711 1712
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1713

1714
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1715 1716 1717 1718
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1719
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1720 1721 1722 1723 1724 1725 1726
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1727
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1728 1729
      minKey = key;
    }
1730 1731 1732 1733
  }

  bool init = false;

1734
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1735
  // DESC: mem -----> imem -----> last block -----> file block
1736 1737
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1738
      init = true;
H
Haojun Liao 已提交
1739 1740 1741 1742
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1743
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1744 1745
    }

1746
    if (minKey == tsLast) {
1747
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1748 1749 1750
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1751
        init = true;
H
Haojun Liao 已提交
1752 1753 1754 1755
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1756
      }
1757
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1758
    }
1759

1760
    if (minKey == k.ts) {
K
kailixu 已提交
1761 1762 1763 1764
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
      if (pSchema == NULL) {
        return terrno;
      }
H
Haojun Liao 已提交
1765
      if (init) {
K
kailixu 已提交
1766
        tRowMergerAdd(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1767
      } else {
1768
        init = true;
H
Haojun Liao 已提交
1769 1770 1771 1772 1773 1774 1775 1776
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1777 1778 1779 1780 1781
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1782
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1783 1784 1785 1786 1787 1788
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1789
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1790 1791
        return code;
      }
1792 1793
    }

1794
    if (minKey == tsLast) {
1795
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1796 1797 1798
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1799
        init = true;
H
Haojun Liao 已提交
1800 1801 1802 1803
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1804
      }
1805
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1806 1807 1808
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1809 1810 1811
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1812
        init = true;
H
Haojun Liao 已提交
1813 1814 1815 1816
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1817 1818 1819
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1820 1821
  }

1822 1823 1824 1825 1826
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1827
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1828 1829 1830 1831 1832 1833

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1834 1835 1836
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1837
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1838
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1839 1840 1841

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1842
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1843
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1844

1845 1846 1847
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
1848
      pBlockScanInfo->lastKey = tsLastBlock;
1849 1850
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1851 1852 1853 1854
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1855 1856 1857

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1858
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1859

H
Haojun Liao 已提交
1860
      code = tRowMergerGetRow(&merge, &pTSRow);
1861 1862 1863
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1864

1865
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1866 1867 1868 1869 1870

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1871 1872 1873 1874 1875
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1876
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1877 1878

    // merge with block data if ts == key
H
Haojun Liao 已提交
1879
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1880 1881 1882
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1883
    code = tRowMergerGetRow(&merge, &pTSRow);
1884 1885 1886 1887
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1888
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1889 1890 1891 1892

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1893 1894 1895 1896

  return TSDB_CODE_SUCCESS;
}

1897 1898
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1899 1900
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1901
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1902
    // no last block available, only data block exists
1903
    if (!hasDataInLastBlock(pLastBlockReader)) {
1904 1905 1906 1907 1908 1909 1910 1911 1912
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1913
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1914 1915 1916 1917
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1918

H
Haojun Liao 已提交
1919 1920 1921 1922 1923
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1924
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1925 1926 1927 1928

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1929
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1930

H
Haojun Liao 已提交
1931
        code = tRowMergerGetRow(&merge, &pTSRow);
1932 1933 1934 1935
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1936
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
1937

1938 1939
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1940
        return code;
1941
      } else {
1942
        return TSDB_CODE_SUCCESS;
1943
      }
1944
    } else {  // desc order
1945
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1946
    }
1947
  } else {  // only last block exists
1948
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1949
  }
1950 1951
}

1952 1953
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1954 1955 1956
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1957 1958 1959
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1960 1961
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1962

1963
  int64_t tsLast = INT64_MIN;
1964 1965 1966
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1967

H
Hongze Cheng 已提交
1968
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1969 1970 1971 1972

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1973
  int64_t minKey = 0;
1974 1975 1976 1977 1978
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1979

1980 1981 1982
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1983

1984
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1985 1986
      minKey = key;
    }
1987

1988
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1989 1990 1991
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1992
    minKey = INT64_MIN;  // let find the maximum ts value
1993 1994 1995 1996 1997 1998 1999 2000
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

2001
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
2002 2003 2004
      minKey = key;
    }

2005
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
2006 2007
      minKey = tsLast;
    }
2008 2009 2010 2011
  }

  bool init = false;

2012 2013 2014 2015
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
2016
      init = true;
2017
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2018
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2019 2020 2021 2022
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

2023
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2024 2025
    }

2026
    if (minKey == tsLast) {
2027
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2028 2029 2030
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2031
        init = true;
2032
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2033 2034 2035
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2036
      }
H
Haojun Liao 已提交
2037

2038
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2039 2040 2041
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2042 2043 2044
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2045 2046
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2047 2048 2049 2050
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2051
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2052 2053 2054
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2055
      }
H
Haojun Liao 已提交
2056

2057 2058
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
2059 2060 2061
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2062 2063
    }

2064
    if (minKey == k.ts) {
H
Haojun Liao 已提交
2065
      if (init) {
2066 2067 2068 2069
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
2070 2071
        tRowMerge(&merge, pRow);
      } else {
2072
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2073
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2074 2075 2076
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2077
      }
H
Haojun Liao 已提交
2078 2079 2080 2081 2082
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2083 2084 2085 2086
    }
  } else {
    if (minKey == k.ts) {
      init = true;
2087
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2088
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
2089 2090 2091 2092
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
2093 2094 2095 2096 2097
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2098 2099 2100
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
2101 2102 2103
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
2104 2105
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
2106
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
2107 2108 2109
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2110
      }
H
Haojun Liao 已提交
2111 2112 2113 2114 2115
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2116 2117 2118
    }

    if (minKey == tsLast) {
2119
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
2120 2121 2122
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
2123
        init = true;
2124
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
2125 2126 2127
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
2128
      }
2129
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
2130 2131 2132
    }

    if (minKey == key) {
H
Haojun Liao 已提交
2133
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
2134
      if (!init) {
2135
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
2136 2137 2138
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
2139
      } else {
2140 2141 2142
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
2143
        tRowMerge(&merge, &fRow);
2144 2145
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2146 2147 2148
    }
  }

2149
  if (merge.pTSchema == NULL) {
2150 2151 2152
    return code;
  }

H
Haojun Liao 已提交
2153
  code = tRowMergerGetRow(&merge, &pTSRow);
2154 2155 2156 2157
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2158
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2159 2160 2161

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2162
  return code;
2163 2164
}

2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

H
Haojun Liao 已提交
2189
        tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2190
                  "-%" PRId64 " %s",
2191 2192
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2193
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
2194 2195 2196 2197 2198
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2199
    tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

H
Haojun Liao 已提交
2210
        tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2211
                  "-%" PRId64 " %s",
2212 2213
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
H
Haojun Liao 已提交
2214
        tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
2215 2216 2217 2218 2219
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
H
Haojun Liao 已提交
2220
    tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
2221 2222 2223 2224 2225 2226 2227 2228
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2229 2230
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2231 2232 2233 2234 2235 2236 2237 2238
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2250
  TSDBKEY k = {.ts = ts, .version = ver};
2251 2252
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2253 2254 2255
    return false;
  }

2256 2257 2258
  return true;
}

2259
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2260
  // the last block reader has been initialized for this table.
2261
  if (pLBlockReader->uid == pScanInfo->uid) {
2262
    return hasDataInLastBlock(pLBlockReader);
2263 2264
  }

2265 2266
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2267 2268
  }

2269 2270
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2271

H
Hongze Cheng 已提交
2272
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2273 2274 2275
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2276
  } else {
2277
    w.ekey = pScanInfo->lastKey + step;
2278 2279
  }

2280 2281 2282
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2283 2284 2285 2286
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2287
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2288 2289
}

2290
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2291
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2292
  return TSDBROW_TS(&row);
2293 2294
}

H
Hongze Cheng 已提交
2295
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2296 2297

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
H
Haojun Liao 已提交
2298 2299
  if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
    return false; // this is an invalid result.
2300
  }
2301
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2302
}
2303

2304 2305
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2306 2307
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
2308
    pBlockScanInfo->lastKey = key;
2309 2310
    return TSDB_CODE_SUCCESS;
  } else {
2311 2312
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2313 2314 2315
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2316 2317 2318 2319 2320
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2321
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2322
    code = tRowMergerGetRow(&merge, &pTSRow);
2323 2324 2325 2326
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2327
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
2328 2329 2330 2331 2332 2333 2334

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2335 2336
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2337 2338
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2339
  TSDBROW *pRow = NULL, *piRow = NULL;
2340
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2341 2342 2343
  if (pBlockScanInfo->iter.hasVal) {
    pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  }
2344

2345 2346 2347
  if (pBlockScanInfo->iiter.hasVal) {
    piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
  }
2348

2349 2350 2351 2352
  // two levels of mem-table does contain the valid rows
  if (pRow != NULL && piRow != NULL) {
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
  }
2353

2354 2355 2356 2357
  // imem + file + last block
  if (pBlockScanInfo->iiter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
  }
2358

2359 2360 2361
  // mem + file + last block
  if (pBlockScanInfo->iter.hasVal) {
    return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
2362
  }
2363 2364 2365

  // files data blocks + last block
  return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2366 2367
}

H
Haojun Liao 已提交
2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
                                     STsdbReader* pReader, bool* loadNeighbor) {
  int32_t     code = TSDB_CODE_SUCCESS;
  int32_t     step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
  int32_t     nextIndex = -1;
  SBlockIndex nxtBIndex = {0};

  *loadNeighbor = false;
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);

  bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
  if (!hasNeighbor) {  // do nothing
    return code;
  }

  if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) {  // load next block
    SReaderStatus*  pStatus = &pReader->status;
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

    // 1. find the next neighbor block in the scan block list
    SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);

    // 2. remove it from the scan block list
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);

    // 3. load the neighbor block, and set it to be the currently accessed file data block
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // 4. check the data values
    initBlockDumpInfo(pReader, pBlockIter);
    *loadNeighbor = true;
  }

  return code;
}

2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
  SSDataBlock* pResBlock = pReader->pResBlock;

  pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
  pResBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);

  setComposedBlockFlag(pReader, true);

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
}

2421
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2422 2423
  int32_t code = TSDB_CODE_SUCCESS;

2424 2425
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2426
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2427 2428
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

2429
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2430
  int64_t st = taosGetTimestampUs();
2431
  int32_t step = asc ? 1 : -1;
2432
  double  el = 0;
2433 2434 2435

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
2436 2437
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
    if (p == NULL) {
H
Haojun Liao 已提交
2438
      code = TSDB_CODE_INVALID_PARA;
2439 2440
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2441 2442 2443
      goto _end;
    }

H
Hongze Cheng 已提交
2444
    pBlockScanInfo = *(STableBlockScanInfo**)p;
2445

H
Haojun Liao 已提交
2446
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2447
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2448 2449

    // it is a clean block, load it directly
2450 2451
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && 
        pBlock->nRow <= pReader->capacity) {
2452
      if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2453
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
2454 2455

        // record the last key value
H
Hongze Cheng 已提交
2456
        pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
H
Haojun Liao 已提交
2457 2458
        goto _end;
      }
H
Haojun Liao 已提交
2459 2460
    }
  } else {  // file blocks not exist
2461
    pBlockScanInfo = *pReader->status.pTableIter;
2462 2463
  }

2464
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2465
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2466

2467
  while (1) {
2468
    bool hasBlockData = false;
2469
    {
H
Haojun Liao 已提交
2470
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2471 2472 2473 2474 2475
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2476 2477
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2478
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2479
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
H
Haojun Liao 已提交
2480
          pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);  // NOTE: get the new block info
H
Haojun Liao 已提交
2481

H
Haojun Liao 已提交
2482 2483 2484 2485 2486
          // continue check for the next file block if the last ts in the current block
          // is overlapped with the next neighbor block
          bool loadNeighbor = false;
          code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
          if ((!loadNeighbor) || (code != 0)) {
2487 2488
            setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
            break;
2489
          }
2490 2491
        }
      }
2492
    }
2493

2494
    // no data in last block and block, no need to proceed.
2495
    if (hasBlockData == false) {
2496
      break;
2497 2498
    }

2499
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2500

2501
    // currently loaded file data block is consumed
2502
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2503
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2504
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2505 2506 2507 2508 2509
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2510 2511 2512
    }
  }

H
Haojun Liao 已提交
2513
_end:
2514 2515
  el = (taosGetTimestampUs() - st) / 1000.0;
  updateComposedBlockInfo(pReader, el, pBlockScanInfo);
2516

2517 2518
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Hongze Cheng 已提交
2519
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2520
              pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2521
              pResBlock->info.rows, el, pReader->idStr);
2522
  }
2523

H
Haojun Liao 已提交
2524
  return code;
2525 2526 2527 2528
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

2529 2530 2531 2532 2533 2534 2535 2536
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
  if (pDelSkyline == NULL) {
    return 0;
  }

  return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
}

dengyihao's avatar
dengyihao 已提交
2537 2538
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2539 2540 2541
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2542

2543
  int32_t code = 0;
2544 2545
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2546
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2547
  if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
2548
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
2549
    SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
2550

H
Haojun Liao 已提交
2551
    if (pIdx != NULL) {
2552
      code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
2553 2554 2555
    }
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2556
    }
2557
  }
2558

2559 2560 2561 2562 2563 2564 2565
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2566 2567
  }

2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
2582 2583 2584 2585 2586 2587 2588
  int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);

  pBlockScanInfo->iter.index = index;
  pBlockScanInfo->iiter.index = index;
  pBlockScanInfo->fileDelIndex = index;
  pBlockScanInfo->lastBlockDelIndex = index;

2589 2590
  return code;

H
Haojun Liao 已提交
2591
_err:
2592 2593
  taosArrayDestroy(pDelData);
  return code;
2594 2595
}

H
Haojun Liao 已提交
2596
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2597 2598 2599 2600 2601 2602
  bool asc = ASCENDING_TRAVERSE(pReader->order);
  TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX;

  TSDBKEY  key = {.ts = initialVal}, ikey = {.ts = initialVal};

  bool hasKey = false, hasIKey = false;
2603
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2604
  if (pRow != NULL) {
2605
    hasKey = true;
2606 2607 2608
    key = TSDBROW_KEY(pRow);
  }

2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623
  TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
  if (pIRow != NULL) {
    hasIKey = true;
    ikey = TSDBROW_KEY(pIRow);
  }
  
  if (hasKey) {
    if (hasIKey) { // has data in mem & imem
      if (asc) {
        return key.ts <= ikey.ts ? key : ikey;
      } else  {
        return key.ts <= ikey.ts ? ikey: key;
      }
    } else {  // no data in imem
      return key;
2624
    }
2625 2626
  } else {  // no data in mem & imem, return the initial value
    return hasIKey? ikey:key;
2627 2628 2629
  }
}

2630
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2631
  SReaderStatus* pStatus = &pReader->status;
2632
  pBlockNum->numOfBlocks = 0;
2633
  pBlockNum->numOfLastFiles = 0;
2634

2635
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2636
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2637 2638

  while (1) {
2639
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2640
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2641 2642 2643
      break;
    }

H
Haojun Liao 已提交
2644
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2645 2646
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2647
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2648 2649 2650
      return code;
    }

H
Hongze Cheng 已提交
2651
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2652
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2653
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2654
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2655 2656 2657
        return code;
      }

2658
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2659 2660 2661
        break;
      }
    }
2662

H
Haojun Liao 已提交
2663 2664 2665
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2666
  taosArrayDestroy(pIndexList);
2667

H
Haojun Liao 已提交
2668
  if (pReader->pReadSnap != NULL) {
2669

H
Haojun Liao 已提交
2670 2671 2672 2673 2674 2675
    SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
    if (pReader->pDelFReader == NULL && pDelFile != NULL) {
      int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2676

H
Haojun Liao 已提交
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687
      pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
      if (pReader->pDelIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        return code;
      }

      code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pReader->pDelIdx);
        return code;
      }
2688 2689 2690
    }
  }

H
Haojun Liao 已提交
2691 2692 2693
  return TSDB_CODE_SUCCESS;
}

2694
static int32_t uidComparFunc(const void* p1, const void* p2) {
2695 2696
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2697 2698 2699
  if (pu1 == pu2) {
    return 0;
  } else {
2700
    return (pu1 < pu2) ? -1 : 1;
2701 2702
  }
}
2703

2704
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
2705 2706 2707 2708
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2709
  while (p != NULL) {
H
Hongze Cheng 已提交
2710
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
2711 2712 2713 2714 2715 2716 2717
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729
// reset the last del file index
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
  void* p = taosHashIterate(pStatus->pTableMap, NULL);
  while (p != NULL) {
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;

    // reset the last del file index
    pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
    p = taosHashIterate(pStatus->pTableMap, p);
  }
}

2730 2731 2732
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
  SReaderStatus* pStatus = &pReader->status;

2733 2734 2735 2736
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2737

2738
  if (pOrderCheckInfo->tableUidList == NULL) {
2739 2740 2741 2742 2743 2744
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2745
    extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
2746 2747 2748
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2749 2750
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2751 2752
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2753 2754

      // the tableMap has already updated
2755
      if (pStatus->pTableIter == NULL) {
2756
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2757 2758 2759 2760 2761
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
2762
        extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
2763 2764 2765

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2766
      }
2767
    }
2768
  }
2769

2770 2771 2772
  return TSDB_CODE_SUCCESS;
}

2773
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2774 2775 2776 2777 2778 2779 2780 2781
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2782
  return (pStatus->pTableIter != NULL);
2783 2784
}

2785
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2786
  SReaderStatus*    pStatus = &pReader->status;
2787 2788
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2789
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
2790
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pReader);
2791
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2792 2793
    return code;
  }
2794

2795 2796
  SSDataBlock* pResBlock = pReader->pResBlock;

2797
  while (1) {
2798
    // load the last data block of current table
H
Hongze Cheng 已提交
2799
    STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
2800 2801

    bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2802
    if (!hasVal) {
2803 2804
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2805 2806
        return TSDB_CODE_SUCCESS;
      }
2807

2808
      continue;
2809 2810
    }

2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823
    int64_t st = taosGetTimestampUs();
    while (1) {
      bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

      // no data in last block and block, no need to proceed.
      if (hasBlockLData == false) {
        break;
      }

      buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
      if (pResBlock->info.rows >= pReader->capacity) {
        break;
      }
2824 2825
    }

2826 2827 2828 2829 2830 2831 2832 2833
    double el = (taosGetTimestampUs() - st) / 1000.0;
    updateComposedBlockInfo(pReader, el, pScanInfo);

    if (pResBlock->info.rows > 0) {
      tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
                " rows:%d, elapsed time:%.2f ms %s",
                pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                pResBlock->info.rows, el, pReader->idStr);
2834 2835
      return TSDB_CODE_SUCCESS;
    }
2836

2837
    // current table is exhausted, let's try next table
2838 2839
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2840 2841
      return TSDB_CODE_SUCCESS;
    }
2842 2843 2844
  }
}

2845
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2846 2847
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2848 2849 2850

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2851 2852 2853
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2854

H
Haojun Liao 已提交
2855 2856
  ASSERT(pBlockInfo != NULL);

2857 2858 2859 2860 2861
//  if (pBlockInfo != NULL) {
  pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  //  } else {
  //    pScanInfo = *pReader->status.pTableIter;
  //  }
2862

H
Haojun Liao 已提交
2863
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2864
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2865 2866 2867 2868
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2869 2870 2871
//  if (pBlockInfo != NULL) {
  pBlock = getCurrentBlock(pBlockIter);
//  }
2872

2873
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2874
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2875

2876
  if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2877
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2878 2879
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2880 2881 2882
    }

    // build composed data block
2883
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2884
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2885
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2886
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2887
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2888
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2889 2890 2891 2892
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2893
      ASSERT(tsLast >= pBlock->maxKey.ts);
2894

2895 2896 2897 2898
      SBlockData* pBData = &pReader->status.fileBlockData;
      tBlockDataReset(pBData);

      SSDataBlock* pResBlock = pReader->pResBlock;
2899
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921

      int64_t st = taosGetTimestampUs();

      while (1) {
        bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);

        // no data in last block and block, no need to proceed.
        if (hasBlockLData == false) {
          break;
        }

        buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
        if (pResBlock->info.rows >= pReader->capacity) {
          break;
        }
      }

      double el = (taosGetTimestampUs() - st) / 1000.0;
      updateComposedBlockInfo(pReader, el, pScanInfo);

      if (pResBlock->info.rows > 0) {
        tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
H
Haojun Liao 已提交
2922
                  " rows:%d, elapsed time:%.2f ms %s",
2923 2924 2925
                  pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
                  pResBlock->info.rows, el, pReader->idStr);
      }
H
Hongze Cheng 已提交
2926
    } else {  // whole block is required, return it directly
2927 2928
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
H
Haojun Liao 已提交
2929
      pInfo->id.uid = pScanInfo->uid;
2930 2931 2932
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
2933

2934
      // update the last key for the corresponding table
H
Hongze Cheng 已提交
2935
      pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
H
Haojun Liao 已提交
2936 2937 2938 2939
      tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
                "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
                pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
                pBlock->maxKey.ts, pReader->idStr);
2940
    }
2941 2942 2943 2944 2945
  }

  return code;
}

H
Haojun Liao 已提交
2946
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2947 2948
  SReaderStatus* pStatus = &pReader->status;

2949
  while (1) {
2950 2951 2952
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2953
        return TSDB_CODE_SUCCESS;
2954 2955 2956
      }
    }

2957 2958
    STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
    initMemDataIterator(*pBlockScanInfo, pReader);
2959

2960
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2961
    int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2962 2963 2964 2965
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2966
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2967
      return TSDB_CODE_SUCCESS;
2968 2969 2970 2971 2972
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2973
      return TSDB_CODE_SUCCESS;
2974 2975 2976 2977
    }
  }
}

2978
// set the correct start position in case of the first/last file block, according to the query time window
2979
void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2980
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2981

2982 2983 2984
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2985 2986 2987

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2988
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2989 2990
}

2991
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2992 2993
  SBlockNumber num = {0};

2994
  int32_t code = moveToNextFile(pReader, &num);
2995 2996 2997 2998 2999
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
3000
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
3001 3002 3003 3004 3005
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
3006 3007
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
3008
  } else {  // no block data, only last block exists
3009
    tBlockDataReset(&pReader->status.fileBlockData);
3010
    resetDataBlockIterator(pBlockIter, pReader->order);
3011
  }
3012 3013

  // set the correct start position according to the query time window
3014
  initBlockDumpInfo(pReader, pBlockIter);
3015 3016 3017
  return code;
}

3018
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
3019 3020
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
3021 3022
}

3023
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
3024
  int32_t code = TSDB_CODE_SUCCESS;
3025 3026
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

3027 3028
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3029
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
3030
  _begin:
3031 3032 3033 3034 3035
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3036 3037 3038 3039
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

3040
    // all data blocks are checked in this last block file, now let's try the next file
3041 3042 3043 3044 3045 3046 3047 3048
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

3049
      // this file does not have data files, let's start check the last block file if exists
3050
      if (pBlockIter->numOfBlocks == 0) {
3051
        resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

3066
  while (1) {
3067 3068
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3069
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
3070
      code = buildComposedDataBlock(pReader);
3071 3072 3073 3074
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
3075
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
3076 3077
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
3078
        } else {
H
Haojun Liao 已提交
3079
          if (pReader->status.pCurrentFileset->nSttF > 0) {
3080 3081 3082
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
3083
            resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3084 3085 3086
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
3087

3088 3089 3090 3091
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
3092

3093 3094
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
3095
              resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
3096 3097
              goto _begin;
            }
3098
          }
3099
        }
H
Haojun Liao 已提交
3100
      }
3101 3102

      code = doBuildDataBlock(pReader);
3103 3104
    }

3105 3106 3107 3108 3109 3110 3111 3112
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
3113
}
H
refact  
Hongze Cheng 已提交
3114

3115 3116
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
3117
  if (VND_IS_RSMA(pVnode)) {
3118
    int8_t  level = 0;
3119 3120
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
3121
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
H
Hongze Cheng 已提交
3122 3123
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
3124

3125
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
3126 3127 3128 3129 3130 3131 3132
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
3133
      if ((now - pRetention->keep) <= (winSKey + offset)) {
3134 3135 3136 3137 3138
        break;
      }
      ++level;
    }

3139
    const char* str = (idStr != NULL) ? idStr : "";
3140 3141

    if (level == TSDB_RETENTION_L0) {
3142
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
3143
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
3144 3145
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
3146
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
3147
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
3148 3149
      return VND_RSMA1(pVnode);
    } else {
3150
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
3151
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
3152 3153 3154 3155 3156 3157 3158
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
3159
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
3160
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
3161 3162

  int64_t endVer = 0;
L
Liu Jicong 已提交
3163 3164
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
3165 3166
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
3167
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
3168 3169
  }

H
Haojun Liao 已提交
3170
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
3171 3172
}

3173
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
3174 3175 3176
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
3177

L
Liu Jicong 已提交
3178 3179 3180
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
3181

3182 3183 3184 3185 3186 3187
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
3188
        return false;
3189 3190
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
5
54liuyao 已提交
3191
        return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
3192 3193
      }
    } else {
3194 3195 3196 3197 3198 3199 3200
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

3201 3202
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

3218 3219
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
3220 3221 3222 3223 3224 3225
            return true;
          }
        }
      }

      return false;
3226 3227
    }
  } else {
3228 3229
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
3230

3231 3232 3233 3234 3235 3236 3237
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
3238
    } else {
3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
3266 3267 3268 3269 3270
      }

      return false;
    }
  }
3271 3272

  return false;
3273 3274
}

3275
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
3276
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
3277 3278
    return NULL;
  }
H
Hongze Cheng 已提交
3279

3280
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
3281
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
3282
  if (outOfTimeWindow(key.ts, &pReader->window)) {
3283
    pIter->hasVal = false;
H
Haojun Liao 已提交
3284 3285
    return NULL;
  }
H
Hongze Cheng 已提交
3286

3287
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
3288
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
3289
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3290 3291
    return pRow;
  }
H
Hongze Cheng 已提交
3292

3293
  while (1) {
3294 3295
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3296 3297
      return NULL;
    }
H
Hongze Cheng 已提交
3298

3299
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
3300

H
Haojun Liao 已提交
3301
    key = TSDBROW_KEY(pRow);
3302
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3303
      pIter->hasVal = false;
H
Haojun Liao 已提交
3304 3305
      return NULL;
    }
H
Hongze Cheng 已提交
3306

dengyihao's avatar
dengyihao 已提交
3307
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3308
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3309 3310 3311 3312
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3313

3314 3315
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3316
  while (1) {
3317 3318
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3319 3320
      break;
    }
H
Hongze Cheng 已提交
3321

3322
    // data exists but not valid
3323
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3324 3325 3326 3327 3328
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3329
    TSDBKEY k = TSDBROW_KEY(pRow);
3330
    if (k.ts != ts) {
H
Haojun Liao 已提交
3331 3332 3333
      break;
    }

H
Haojun Liao 已提交
3334
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3335 3336 3337 3338
    if (pTSchema == NULL) {
      return terrno;
    }

3339
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3340 3341 3342 3343 3344
  }

  return TSDB_CODE_SUCCESS;
}

3345
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3346
                                          SVersionRange* pVerRange, int32_t step) {
3347
  while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
3348
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3349
      rowIndex += step;
3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3366
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3367 3368
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3369
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3370
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3371

3372
  *state = CHECK_FILEBLOCK_QUIT;
3373
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3374

H
Haojun Liao 已提交
3375 3376
  bool loadNeighbor = true;
  int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
3377

H
Haojun Liao 已提交
3378
  if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
3379 3380
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3381
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3382 3383 3384 3385
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

H
Haojun Liao 已提交
3386
  return code;
3387 3388
}

3389 3390
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3391 3392
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3393
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3394
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3395
  int32_t step = asc ? 1 : -1;
3396

3397
  pDumpInfo->rowIndex += step;
3398
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3399 3400 3401
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3402

3403 3404 3405 3406
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3407

3408
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3409
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3410 3411 3412
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3413
      }
3414
    }
H
Haojun Liao 已提交
3415
  }
3416

H
Haojun Liao 已提交
3417 3418 3419
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3420
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3421 3422
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3423 3424
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3425
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3426 3427 3428 3429 3430 3431 3432 3433 3434
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3435 3436
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3437
  TSDBROW* pNextRow = NULL;
3438
  TSDBROW  current = *pRow;
3439

3440 3441
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3442

3443 3444 3445
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3446
      return TSDB_CODE_SUCCESS;
3447
    } else {  // has next point in mem/imem
3448
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3449 3450 3451
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3452
        return TSDB_CODE_SUCCESS;
3453 3454
      }

H
Haojun Liao 已提交
3455
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3456 3457
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3458
        return TSDB_CODE_SUCCESS;
3459
      }
3460
    }
3461 3462
  }

3463 3464
  SRowMerger merge = {0};

3465
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3466
  terrno = 0;
H
Haojun Liao 已提交
3467
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3468 3469 3470
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3471

3472 3473
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3474
  }
H
Haojun Liao 已提交
3475

H
Haojun Liao 已提交
3476 3477 3478 3479
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3480 3481

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3482
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3483 3484 3485
    return terrno;
  }

H
Haojun Liao 已提交
3486 3487
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3488 3489 3490 3491 3492
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3493
  code = tRowMergerGetRow(&merge, pTSRow);
3494 3495 3496
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3497

3498
  tRowMergerClear(&merge);
3499
  *freeTSRow = true;
3500
  return TSDB_CODE_SUCCESS;
3501 3502
}

3503
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3504
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3505 3506
  SRowMerger merge = {0};

3507 3508 3509
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3510
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3511
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3512

H
Haojun Liao 已提交
3513 3514 3515 3516 3517 3518 3519 3520 3521 3522
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3523

3524
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3525 3526 3527 3528 3529 3530
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3531
  } else {
H
Haojun Liao 已提交
3532
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3533

H
Haojun Liao 已提交
3534
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3535
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3536 3537 3538 3539 3540 3541 3542 3543
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3544 3545

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3546 3547 3548 3549 3550
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3551
  }
3552

3553
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
H
Haojun Liao 已提交
3554 3555
  tRowMergerClear(&merge);

3556
  return code;
3557 3558
}

3559 3560
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3561 3562
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3563
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3564
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3565

3566 3567
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3568
  if (pBlockScanInfo->iter.hasVal) {
3569 3570 3571 3572 3573 3574
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3575
  if (pBlockScanInfo->iiter.hasVal) {
3576 3577 3578 3579 3580 3581
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3582
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3583
    TSDBKEY k = TSDBROW_KEY(pRow);
3584
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3585

3586
    int32_t code = TSDB_CODE_SUCCESS;
3587 3588
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3589
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3590
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3591
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3592
      }
3593
    } else {  // ik.ts == k.ts
3594
      *freeTSRow = true;
3595 3596 3597 3598
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3599
    }
3600

3601
    return code;
H
Haojun Liao 已提交
3602 3603
  }

3604
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3605 3606
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3607 3608
  }

3609
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3610
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3611 3612 3613 3614 3615
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3616 3617
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
                             STableBlockScanInfo* pScanInfo) {
H
Haojun Liao 已提交
3618
  int32_t outputRowIndex = pBlock->info.rows;
3619
  int64_t uid = pScanInfo->uid;
3620

3621
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3622
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3623

3624
  SColVal colVal = {0};
3625
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3626

H
Haojun Liao 已提交
3627 3628
  if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3629
    ((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
3630 3631 3632
    i += 1;
  }

H
Haojun Liao 已提交
3633
  while (i < pSupInfo->numOfCols && j < pSchema->numOfCols) {
H
Haojun Liao 已提交
3634
    col_id_t colId = pSupInfo->colId[i];
3635 3636

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3637
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3638

H
Haojun Liao 已提交
3639
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
H
Haojun Liao 已提交
3640
      doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
3641 3642 3643
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3644
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3645 3646

      colDataAppendNULL(pColInfoData, outputRowIndex);
3647 3648 3649
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3650
    }
3651 3652
  }

3653
  // set null value since current column does not exist in the "pSchema"
H
Haojun Liao 已提交
3654
  while (i < pSupInfo->numOfCols) {
H
Haojun Liao 已提交
3655
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3656
    colDataAppendNULL(pColInfoData, outputRowIndex);
3657 3658 3659
    i += 1;
  }

3660
  pBlock->info.dataLoad = 1;
3661
  pBlock->info.rows += 1;
3662
  pScanInfo->lastKey = pTSRow->ts;
3663 3664 3665
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3666 3667
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3668 3669 3670 3671
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Haojun Liao 已提交
3672 3673
  if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
    SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
H
Haojun Liao 已提交
3674
    ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
3675
    i += 1;
3676 3677 3678
  }

  SColVal cv = {0};
H
Hongze Cheng 已提交
3679
  int32_t numOfInputCols = pBlockData->nColData;
H
Haojun Liao 已提交
3680
  int32_t numOfOutputCols = pSupInfo->numOfCols;
3681

3682
  while (i < numOfOutputCols && j < numOfInputCols) {
H
Haojun Liao 已提交
3683
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
H
Haojun Liao 已提交
3684
    if (pData->cid < pSupInfo->colId[i]) {
3685 3686 3687 3688
      j += 1;
      continue;
    }

H
Haojun Liao 已提交
3689 3690
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
    if (pData->cid == pSupInfo->colId[i]) {
3691 3692
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3693
      j += 1;
H
Haojun Liao 已提交
3694 3695
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3696 3697 3698 3699 3700 3701 3702
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
H
Haojun Liao 已提交
3703
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
3704
    colDataAppendNULL(pCol, outputRowIndex);
3705 3706 3707
    i += 1;
  }

3708
  pResBlock->info.dataLoad = 1;
3709 3710 3711 3712
  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3713 3714
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3715 3716 3717 3718
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3719
    bool    freeTSRow = false;
3720
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3721 3722
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3723 3724
    }

3725 3726
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo);

3727 3728 3729
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3730 3731

    // no data in buffer, return immediately
3732
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3733 3734 3735
      break;
    }

3736
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3737 3738 3739 3740 3741 3742
      break;
    }
  } while (1);

  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3743

3744 3745
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3746
  int32_t size = taosHashGetSize(pReader->status.pTableMap);
3747

3748
  STableBlockScanInfo** p = NULL;
3749
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
3750
    clearBlockScanInfo(*p);
3751 3752
  }

3753 3754 3755
  // todo handle the case where size is less than the value of num
  ASSERT(size >= num);

3756 3757
  taosHashClear(pReader->status.pTableMap);

H
Hongze Cheng 已提交
3758 3759
  STableKeyInfo* pList = (STableKeyInfo*)pTableList;
  for (int32_t i = 0; i < num; ++i) {
3760 3761 3762
    STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
    pInfo->uid = pList[i].uid;
    taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
3763 3764
  }

H
Hongze Cheng 已提交
3765 3766 3767
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3768 3769 3770 3771 3772 3773
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3774

dengyihao's avatar
dengyihao 已提交
3775 3776 3777 3778 3779 3780
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3781

H
Hongze Cheng 已提交
3782
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3783

3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3799
// ====================================== EXPOSED APIs ======================================
3800
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
H
Haojun Liao 已提交
3801
                       SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
3802 3803 3804 3805 3806 3807
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

H
Haojun Liao 已提交
3808 3809 3810 3811 3812 3813 3814 3815
  int32_t capacity = 0;
  if (pResBlock == NULL) {
    capacity = 4096;
  } else {
    capacity = pResBlock->info.capacity;
  }

  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
3816
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3817 3818
    goto _err;
  }
H
Hongze Cheng 已提交
3819

3820
  // check for query time window
H
Haojun Liao 已提交
3821
  STsdbReader* pReader = *ppReader;
3822
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3823 3824 3825
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3826

3827 3828
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3829
    int32_t order = pCond->order;
3830
    if (order == TSDB_ORDER_ASC) {
3831
      pCond->twindows.ekey = window.skey;
3832 3833 3834
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3835
      pCond->twindows.skey = window.ekey;
3836 3837 3838 3839
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3840
    // here we only need one more row, so the capacity is set to be ONE.
H
Haojun Liao 已提交
3841
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr);
3842 3843 3844 3845 3846
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3847
      pCond->twindows.skey = window.ekey;
3848
      pCond->twindows.ekey = INT64_MAX;
3849
    } else {
3850
      pCond->twindows.skey = INT64_MIN;
3851
      pCond->twindows.ekey = window.ekey;
3852
    }
3853 3854
    pCond->order = order;

H
Haojun Liao 已提交
3855
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr);
3856 3857 3858 3859 3860
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3861
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3862 3863
  //  no valid error code set in metaGetTbTSchema, so let's set the error code here.
  //  we should proceed in case of tmq processing.
3864
  if (pCond->suid != 0) {
3865
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3866
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3867
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3868
    }
3869 3870
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3871
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3872
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3873
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3874
    }
3875 3876
  }

3877
  if (pReader->pSchema != NULL) {
H
Haojun Liao 已提交
3878 3879 3880 3881
    code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
3882
  }
3883

3884
  STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
H
Haojun Liao 已提交
3885
  pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
H
Haojun Liao 已提交
3886 3887
  if (pReader->status.pTableMap == NULL) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
3888
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
3889 3890
    goto _err;
  }
H
Hongze Cheng 已提交
3891

3892
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3893
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3894 3895 3896
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3897

3898
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3899 3900
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
3901
        goto _err;
3902
      }
3903
    } else {
H
Haojun Liao 已提交
3904 3905
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3906

H
Haojun Liao 已提交
3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3919

H
Haojun Liao 已提交
3920
      code = doOpenReaderImpl(pPrevReader);
3921
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
3922
        goto _err;
3923
      }
3924 3925 3926
    }
  }

3927
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3928
  return code;
H
Hongze Cheng 已提交
3929

3930
_err:
H
Haojun Liao 已提交
3931
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
3932
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
3933
  return code;
3934
}
H
refact  
Hongze Cheng 已提交
3935 3936

void tsdbReaderClose(STsdbReader* pReader) {
3937 3938
  if (pReader == NULL) {
    return;
3939
  }
H
refact  
Hongze Cheng 已提交
3940

3941
  {
H
Haojun Liao 已提交
3942
    if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
3943
      STsdbReader* p = pReader->innerReader[0];
3944

3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3956 3957 3958 3959 3960 3961

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3962
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3963

3964
  taosArrayDestroy(pSupInfo->pColAgg);
H
Haojun Liao 已提交
3965
  for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
3966 3967 3968 3969
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3970

H
Haojun Liao 已提交
3971 3972 3973
  if (pReader->freeBlock) {
    pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
  }
3974

H
Haojun Liao 已提交
3975
  taosMemoryFree(pSupInfo->colId);
H
Haojun Liao 已提交
3976
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3977
  cleanupDataBlockIterator(&pReader->status.blockIter);
3978 3979

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
3980 3981 3982 3983 3984
  if (pReader->status.pTableMap != NULL) {
    destroyAllBlockScanInfo(pReader->status.pTableMap);
    clearBlockScanInfoBuf(&pReader->blockInfoBuf);
  }

H
Haojun Liao 已提交
3985 3986 3987
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3988

3989 3990 3991 3992 3993 3994 3995 3996 3997
  if (pReader->pDelFReader != NULL) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }

  if (pReader->pDelIdx != NULL) {
    taosArrayDestroy(pReader->pDelIdx);
    pReader->pDelIdx = NULL;
  }

H
Haojun Liao 已提交
3998
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3999

4000
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
4001
  SIOCostSummary* pCost = &pReader->cost;
4002

H
Haojun Liao 已提交
4003 4004
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
4005 4006
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
4007

H
Haojun Liao 已提交
4008 4009 4010 4011 4012
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
4013

H
Haojun Liao 已提交
4014
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
4015 4016 4017
            " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, "
            "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
            ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms, %s",
H
Haojun Liao 已提交
4018 4019 4020 4021
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
            pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pReader->idStr);
H
refact  
Hongze Cheng 已提交
4022

4023 4024
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
4025 4026 4027
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
4028
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
4029 4030
}

4031
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4032
  // cleanup the data that belongs to the previous data block
4033 4034
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
4035

4036
  SReaderStatus* pStatus = &pReader->status;
4037 4038 4039
  if (taosHashGetSize(pStatus->pTableMap) == 0){
    return false;
  }
H
Haojun Liao 已提交
4040

4041 4042 4043 4044 4045
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
4046

4047 4048 4049
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
4050
      buildBlockFromBufferSequentially(pReader);
4051
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4052
    }
4053 4054 4055
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
4056
  }
H
refact  
Hongze Cheng 已提交
4057 4058
}

4059 4060 4061 4062 4063
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

4064
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
4065
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
4066
    pReader->step = EXTERNAL_ROWS_PREV;
4067 4068 4069
    if (ret) {
      return ret;
    }
4070
  }
4071

4072
  if (pReader->step == EXTERNAL_ROWS_PREV) {
4073 4074
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
4075
    resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
4076 4077 4078 4079 4080

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4081
    pReader->step = EXTERNAL_ROWS_MAIN;
4082 4083 4084 4085 4086 4087 4088
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

4089
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
4090 4091
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
4092
    resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
4093 4094 4095 4096
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

4097
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
4098
    pReader->step = EXTERNAL_ROWS_NEXT;
4099 4100 4101 4102 4103 4104 4105 4106
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

H
Haojun Liao 已提交
4107 4108
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
  *rows = pReader->pResBlock->info.rows;
H
Haojun Liao 已提交
4109
  *uid = pReader->pResBlock->info.id.uid;
H
Haojun Liao 已提交
4110
  *pWindow = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
4111 4112
}

H
Haojun Liao 已提交
4113
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
4114
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4115
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
H
Haojun Liao 已提交
4116
      setBlockInfo(pReader, rows, uid, pWindow);
4117
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
H
Haojun Liao 已提交
4118
      setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
4119
    } else {
H
Haojun Liao 已提交
4120
      setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
4121 4122
    }
  } else {
H
Haojun Liao 已提交
4123
    setBlockInfo(pReader, rows, uid, pWindow);
4124 4125 4126
  }
}

4127

4128 4129
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
    SColumnDataAgg* pTsAgg) {
4130 4131 4132
  // do fill all null column value SMA info
  int32_t i = 0, j = 0;
  int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg);
4133
  taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144

  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colId[j]) {
      i += 1;
      j += 1;
    } else if (pAgg->colId < pSup->colId[j]) {
      i += 1;
    } else if (pSup->colId[j] < pAgg->colId) {
      if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
4145
        taosArrayInsert(pSup->pColAgg, i ,&nullColAgg);
4146 4147 4148 4149 4150 4151
      }
      j += 1;
    }
  }
}

4152
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
H
Haojun Liao 已提交
4153 4154
  SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;

H
Hongze Cheng 已提交
4155
  int32_t code = 0;
4156
  *allHave = false;
H
Haojun Liao 已提交
4157
  *pBlockSMA = NULL;
H
Hongze Cheng 已提交
4158

4159
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
4160 4161 4162
    return TSDB_CODE_SUCCESS;
  }

4163
  // there is no statistics data for composed block
4164
  if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) {
4165 4166
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4167

4168
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
4169 4170
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Haojun Liao 已提交
4171 4172 4173
  if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
    return TSDB_CODE_SUCCESS;
  }
4174 4175

  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
4176
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
4177
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
4178
    if (code != TSDB_CODE_SUCCESS) {
4179 4180
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
4181 4182
      return code;
    }
4183
  } else {
H
Haojun Liao 已提交
4184
    *pBlockSMA = NULL;
4185
    return TSDB_CODE_SUCCESS;
4186
  }
H
Hongze Cheng 已提交
4187

4188
  *allHave = true;
H
Hongze Cheng 已提交
4189

4190 4191
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
4192

4193 4194
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4195 4196 4197 4198
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;

  // update the number of NULL data rows
4199
  size_t numOfCols = pSup->numOfCols;
4200

4201
  // ensure capacity
H
Haojun Liao 已提交
4202 4203 4204
  if (pDataBlock->pDataBlock) {
    size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
    taosArrayEnsureCap(pSup->pColAgg, colsNum);
4205 4206
  }

4207 4208 4209 4210 4211
  SSDataBlock* pResBlock = pReader->pResBlock;
  if (pResBlock->pBlockAgg == NULL) {
    size_t num = taosArrayGetSize(pResBlock->pDataBlock);
    pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
  }
4212

4213
  // do fill all null column value SMA info
4214
  doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
H
Haojun Liao 已提交
4215
  size_t size = taosArrayGetSize(pSup->pColAgg);
4216

H
Haojun Liao 已提交
4217
  int32_t i = 0, j = 0;
4218 4219
  while (j < numOfCols && i < size) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
H
Haojun Liao 已提交
4220 4221
    if (pAgg->colId == pSup->colId[j]) {
      pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;
4222 4223
      i += 1;
      j += 1;
H
Haojun Liao 已提交
4224
    } else if (pAgg->colId < pSup->colId[j]) {
4225
      i += 1;
H
Haojun Liao 已提交
4226
    } else if (pSup->colId[j] < pAgg->colId) {
H
Haojun Liao 已提交
4227
      // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
4228
      pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg;
4229 4230 4231 4232
      j += 1;
    }
  }

H
Haojun Liao 已提交
4233
  *pBlockSMA = pResBlock->pBlockAgg;
4234
  pReader->cost.smaDataLoad += 1;
4235

4236
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
H
Hongze Cheng 已提交
4237
  return code;
H
Hongze Cheng 已提交
4238 4239
}

H
Haojun Liao 已提交
4240
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
4241 4242 4243
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
H
Haojun Liao 已提交
4244
    return pReader->pResBlock;
4245
  }
4246

H
Haojun Liao 已提交
4247
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
H
Hongze Cheng 已提交
4248 4249
  STableBlockScanInfo* pBlockScanInfo =
      *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
4250 4251 4252 4253 4254 4255
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
4256

4257
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
4258
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
4259
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
4260 4261
    terrno = code;
    return NULL;
4262
  }
4263 4264

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
H
Haojun Liao 已提交
4265
  return pReader->pResBlock;
H
Hongze Cheng 已提交
4266 4267
}

H
Haojun Liao 已提交
4268
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
4280
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
4281
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
4282 4283
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
4284

4285 4286
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
4287
  pReader->order = pCond->order;
4288
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
4289
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
4290
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
4291
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
4292

4293
  // allocate buffer in order to load data blocks from file
4294
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
4295

4296
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
4297
  tsdbDataFReaderClose(&pReader->pFileReader);
4298

4299
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
4300

4301
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
4302
  resetDataBlockIterator(pBlockIter, pReader->order);
H
Haojun Liao 已提交
4303

H
Hongze Cheng 已提交
4304
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
4305
  resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
4306

4307
  int32_t code = 0;
4308

4309 4310 4311 4312 4313 4314
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
4315 4316
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
4317 4318 4319
      return code;
    }
  }
H
Hongze Cheng 已提交
4320

H
Hongze Cheng 已提交
4321
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
H
Hongze Cheng 已提交
4322
            " in query %s",
H
Hongze Cheng 已提交
4323 4324
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
4325

4326
  return code;
H
Hongze Cheng 已提交
4327
}
H
Hongze Cheng 已提交
4328

4329 4330 4331
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
4332

4333 4334 4335 4336
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
4337

4338 4339
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4340

4341 4342 4343
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4344

4345
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4346

4347
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4348

4349 4350
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4351

4352 4353
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4354

4355 4356
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4357
  }
H
Hongze Cheng 已提交
4358

4359
  pTableBlockInfo->numOfTables = numOfTables;
4360
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4361

4362 4363
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4364
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4365

4366 4367
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4368

4369 4370 4371
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4372

4373 4374 4375
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4376

4377 4378 4379
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4380

4381
      pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
4382

4383 4384
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4385

H
Haojun Liao 已提交
4386
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4387 4388 4389 4390 4391
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4392

4393 4394
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4395
    }
H
refact  
Hongze Cheng 已提交
4396

H
Hongze Cheng 已提交
4397 4398
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4399
  }
H
Hongze Cheng 已提交
4400

H
refact  
Hongze Cheng 已提交
4401 4402
  return code;
}
H
Hongze Cheng 已提交
4403

H
refact  
Hongze Cheng 已提交
4404
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4405
  int64_t rows = 0;
H
Hongze Cheng 已提交
4406

4407 4408
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4409

4410
  while (pStatus->pTableIter != NULL) {
4411
    STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
4412 4413

    STbData* d = NULL;
4414
    if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
4415
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4416 4417 4418 4419 4420 4421
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
4422
    if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
4423
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4424 4425 4426 4427 4428 4429 4430 4431
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4432

H
refact  
Hongze Cheng 已提交
4433
  return rows;
H
Hongze Cheng 已提交
4434
}
D
dapan1121 已提交
4435

L
Liu Jicong 已提交
4436
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4437 4438 4439 4440
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
H
Haojun Liao 已提交
4441
  int32_t code = metaGetTableEntryByUidCache(&mr, uid);
D
dapan1121 已提交
4442 4443 4444 4445 4446 4447 4448
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4449

D
dapan1121 已提交
4450
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4451
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4452
    *suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
4453
    code = metaGetTableEntryByUidCache(&mr, *suid);
D
dapan1121 已提交
4454 4455 4456 4457 4458 4459
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
H
Haojun Liao 已提交
4460
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
D
dapan1121 已提交
4461
    sversion = mr.me.ntbEntry.schemaRow.version;
H
Haojun Liao 已提交
4462 4463 4464 4465
  } else {
    terrno = TSDB_CODE_INVALID_PARA;
    metaReaderClear(&mr);
    return terrno;
D
dapan1121 已提交
4466 4467 4468
  }

  metaReaderClear(&mr);
4469
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4470

D
dapan1121 已提交
4471 4472
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4473

H
Haojun Liao 已提交
4474
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4503
  // fs
H
Hongze Cheng 已提交
4504 4505 4506 4507 4508
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4509 4510 4511 4512 4513 4514 4515 4516

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4517
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4518
_exit:
H
Hongze Cheng 已提交
4519 4520 4521
  return code;
}

H
Haojun Liao 已提交
4522
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4523 4524 4525 4526 4527 4528 4529 4530 4531
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4532
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4533
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4534
  }
H
Haojun Liao 已提交
4535 4536
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}