tsdbRead.c 138.2 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
73 74
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Hongze Cheng 已提交
75 76 77
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
78
  SArray*          pColAgg;
79
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
80
  SColumnDataAgg** plist;
81
  int16_t*         colIds;  // column ids for loading file block data
82
  int32_t          numOfCols;
83
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
84 85
} SBlockLoadSuppInfo;

86
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
87 88 89 90 91
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
92
  SSttBlockLoadInfo* pInfo;
93 94
} SLastBlockReader;

95
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
96 97 98
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
99
  int32_t           order;
H
Hongze Cheng 已提交
100
  SLastBlockReader* pLastBlockReader;  // last file block reader
101
} SFilesetIter;
H
Haojun Liao 已提交
102 103

typedef struct SFileDataBlockInfo {
104
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
105
  uint64_t uid;
106
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
107 108 109
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
110
  int32_t   numOfBlocks;
111
  int32_t   index;
H
Hongze Cheng 已提交
112
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
113
  int32_t   order;
114
  SDataBlk  block;  // current SDataBlk data
115
  SHashObj* pTableMap;
H
Haojun Liao 已提交
116 117 118
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
119 120 121 122
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
123 124
} SFileBlockDumpInfo;

125
typedef struct SUidOrderCheckInfo {
126 127
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
128 129
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
130
typedef struct SReaderStatus {
131
  bool                 loadFromFile;       // check file stage
132
  bool                 composedDataBlock;  // the returned data block is a composed block or not
133 134
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
135
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
136
  SFileBlockDumpInfo   fBlockDumpInfo;
137 138 139 140
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
141 142
} SReaderStatus;

H
Hongze Cheng 已提交
143
struct STsdbReader {
H
Haojun Liao 已提交
144 145 146 147 148 149 150
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
151 152
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
153
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
154
  STsdbReadSnap*     pReadSnap;
155
  SIOCostSummary     cost;
156 157
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
158 159
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
160

161 162
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
163
};
H
Hongze Cheng 已提交
164

H
Haojun Liao 已提交
165
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
166 167
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
168
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
169 170
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
171
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
172
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
173
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
174
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
175
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
176
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
177
                                         int32_t rowIndex);
178
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
179
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
180

H
Hongze Cheng 已提交
181 182 183 184
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
185 186
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
187

dengyihao's avatar
dengyihao 已提交
188 189 190 191
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
192
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
193 194 195
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
196
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
197 198
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
199

200 201
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

202 203 204
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

205
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
206

207
  pSupInfo->numOfCols = numOfCols;
208
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
209
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
210 211 212
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
213 214
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
215

H
Haojun Liao 已提交
216 217
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
218
    pSupInfo->colIds[i] = pCol->info.colId;
219 220 221 222

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
223
  }
H
Hongze Cheng 已提交
224

H
Haojun Liao 已提交
225 226
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
227

228
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
229
  // allocate buffer in order to load data blocks from file
230
  // todo use simple hash instead, optimize the memory consumption
231 232 233
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
234 235 236
    return NULL;
  }

237
  for (int32_t j = 0; j < numOfTables; ++j) {
238
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
239
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
240
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
241
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
242
    } else {
H
Haojun Liao 已提交
243
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
244
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
245
    }
wmmhello's avatar
wmmhello 已提交
246

247 248 249
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
250 251
  }

252 253
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
254

255
  return pTableMap;
H
Hongze Cheng 已提交
256
}
H
Hongze Cheng 已提交
257

H
Haojun Liao 已提交
258
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
259 260
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
261
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
262 263
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
264
    if (p->iter.iter != NULL) {
265
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
266 267
    }

268
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
269
    p->lastKey = ts;
270 271 272
  }
}

273 274 275
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
276

277 278 279
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
280

281 282 283
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
284

285 286 287 288
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
289

290 291 292 293
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
294 295 296 297 298
  }

  taosHashCleanup(pTableMap);
}

299
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
300 301
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
302
}
H
Hongze Cheng 已提交
303

304 305 306
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
307
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
308

309
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
310
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
311

dengyihao's avatar
dengyihao 已提交
312
  STimeWindow win = *pWindow;
313 314 315 316 317 318
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
319

H
Haojun Liao 已提交
320
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
321 322 323 324 325 326
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
327 328 329
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
330 331 332 333
  }
}

// init file iterator
334
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
335
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
336

337 338
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
339
  pIter->pFileList = aDFileSet;
340
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
341

342 343 344 345
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
346
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
347 348
      return code;
    }
349 350
  }

351 352 353 354 355 356 357 358
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

359
  if (pLReader->pInfo == NULL) {
360
    // here we ignore the first column, which is always be the primary timestamp column
361 362
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
363 364 365 366
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
367 368
  }

369
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
370 371 372
  return TSDB_CODE_SUCCESS;
}

373
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
374 375
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
376 377 378
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
379 380 381
    return false;
  }

H
Haojun Liao 已提交
382 383 384
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

385 386
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
387
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
388

H
Haojun Liao 已提交
389 390
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
391

392
  while (1) {
H
Haojun Liao 已提交
393 394 395
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
396

397
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
398

399 400 401 402
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
403

404 405
    pReader->cost.headFileLoad += 1;

406 407 408 409 410 411 412 413 414 415 416 417
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
418 419 420
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
421 422
      continue;
    }
C
Cary Xu 已提交
423

424
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
425
              pReader->window.ekey, pReader->idStr);
426 427
    return true;
  }
428

H
Haojun Liao 已提交
429
_err:
H
Haojun Liao 已提交
430 431 432
  return false;
}

433
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
434 435
  pIter->order = order;
  pIter->index = -1;
436
  pIter->numOfBlocks = 0;
437 438 439 440 441 442 443
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
444
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
445

H
Haojun Liao 已提交
446
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
447 448
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
449 450
}

451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

474 475
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
476
  int32_t      code = 0;
477
  int8_t       level = 0;
H
Haojun Liao 已提交
478
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
479 480
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
481
    goto _end;
H
Hongze Cheng 已提交
482 483
  }

C
Cary Xu 已提交
484 485 486 487
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
488
  initReaderStatus(&pReader->status);
489

L
Liu Jicong 已提交
490
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
491 492
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
493
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
494 495
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
496
  pReader->type = pCond->type;
497
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
498

499
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
500

501
  limitOutputBufferSize(pCond, &pReader->capacity);
502

503 504
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
505
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
506
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
507
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
508 509 510
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
511

512 513
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
514
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
515 516 517 518 519
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

520 521 522 523
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
524
  }
H
Hongze Cheng 已提交
525

526 527
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
528 529
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
530

H
Haojun Liao 已提交
531
_end:
H
Haojun Liao 已提交
532
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
533 534 535
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
536

H
Haojun Liao 已提交
537
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
538
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
539

540
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
541
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
542
  if (code != TSDB_CODE_SUCCESS) {
543
    goto _end;
H
Haojun Liao 已提交
544
  }
H
Hongze Cheng 已提交
545

546 547
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
548
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
549 550
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
551

552 553 554 555
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
556
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
557

558
    // uid check
H
Hongze Cheng 已提交
559
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
560 561 562 563
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
564
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
565 566 567 568 569 570
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
571
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
572 573
    }

H
Hongze Cheng 已提交
574
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
575
  }
H
Hongze Cheng 已提交
576

577
  int64_t et2 = taosGetTimestampUs();
578
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
579
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
580 581 582

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

H
Haojun Liao 已提交
583
_end:
H
Hongze Cheng 已提交
584
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
585 586
  return code;
}
H
Hongze Cheng 已提交
587

588
static void cleanupTableScanInfo(SHashObj* pTableMap) {
589
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
590
  while (1) {
591
    px = taosHashIterate(pTableMap, px);
592 593 594 595
    if (px == NULL) {
      break;
    }

596
    // reset the index in last block when handing a new file
597
    tMapDataClear(&px->mapData);
598 599
    taosArrayClear(px->pBlockList);
  }
600 601
}

602
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
603 604 605 606 607 608
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
609

dengyihao's avatar
dengyihao 已提交
610
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
611
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
612

613
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
614

615
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
616
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
617

618
    sizeInDisk += pScanInfo->mapData.nData;
619
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
620 621
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
622

623
      // 1. time range check
624
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
625 626
        continue;
      }
H
Hongze Cheng 已提交
627

628
      // 2. version range check
H
Hongze Cheng 已提交
629
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
630 631
        continue;
      }
632

633
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
634
      if (p == NULL) {
635
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
636 637
        return TSDB_CODE_OUT_OF_MEMORY;
      }
638

639
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
640
    }
H
Hongze Cheng 已提交
641

H
Haojun Liao 已提交
642
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
643 644 645 646
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
647
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
648
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
649

650
  double el = (taosGetTimestampUs() - st) / 1000.0;
651 652 653 654 655
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
656

657
  pReader->cost.numOfBlocks += total;
658
  pReader->cost.headFileLoadTime += el;
659

H
Haojun Liao 已提交
660 661
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
662

663
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
664
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
665
  pDumpInfo->allDumped = true;
666
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
667 668
}

669 670
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
671
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
672
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
673 674 675
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
676
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
677 678 679 680
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
681
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
682
  }
H
Haojun Liao 已提交
683 684
}

685
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
686 687
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
688 689
    return NULL;
  }
690 691 692

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
693 694
}

H
Hongze Cheng 已提交
695
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
696

H
Haojun Liao 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
770
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
771 772 773 774
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
775 776
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
777 778
    while (1) {
      // check can return
H
Hongze Cheng 已提交
779 780 781
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
782 783

      // change start or end position
H
Hongze Cheng 已提交
784
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
785 786
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
787
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
788 789 790 791
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
792
  } else {  // DESC
H
Haojun Liao 已提交
793
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
794 795
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
796 797
    while (1) {
      // check can return
H
Hongze Cheng 已提交
798 799 800
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
801 802

      // change start or end position
H
Hongze Cheng 已提交
803
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
804 805
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
806
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
807 808 809 810 811 812 813 814 815 816
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
817
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
818 819 820 821 822 823

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
C
Cary Xu 已提交
824 825
    int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
H
Haojun Liao 已提交
826 827 828 829 830
  }

  return endPos;
}

831
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
832
  SReaderStatus*  pStatus = &pReader->status;
833
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
834

835
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
836
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
837
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
838
  SSDataBlock*        pResBlock = pReader->pResBlock;
839
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
840

H
Haojun Liao 已提交
841
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
842
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
843

H
Haojun Liao 已提交
844
  SColVal cv = {0};
845
  int64_t st = taosGetTimestampUs();
846 847
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
848

849 850
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
851 852 853
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
854 855
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
C
Cary Xu 已提交
856 857 858
      int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
859
    }
H
Haojun Liao 已提交
860 861 862
  }

  // time window check
863 864 865 866 867 868 869
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
870
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
871
  if (remain > pReader->capacity) {  // output buffer check
872 873 874
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
875 876
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
877
  int32_t          i = 0;
878 879
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
880
    if (asc) {
H
Haojun Liao 已提交
881
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
882 883 884 885
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
886
    }
H
Haojun Liao 已提交
887

888 889 890
    i += 1;
  }

891 892 893
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
894 895 896
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
897
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
898 899 900
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
901
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
902 903 904 905
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
906
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
907 908 909 910

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
911
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
912 913 914 915 916 917 918 919 920 921 922
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
923
      }
H
Haojun Liao 已提交
924

925
      colIndex += 1;
926
      i += 1;
927 928
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
929
      i += 1;
H
Haojun Liao 已提交
930
    }
931 932
  }

933
  // fill the mis-matched columns with null value
934
  while (i < numOfOutputCols) {
935 936 937
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
938
  }
H
Haojun Liao 已提交
939

940
  pResBlock->info.rows = remain;
941
  pDumpInfo->rowIndex += step * remain;
942

943
  // check if current block are all handled
944
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
945 946 947 948
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
949
  } else {
950 951
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
952
  }
H
Haojun Liao 已提交
953

954
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
955
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
956

957
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
958
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
959
                ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
960 961
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
962 963 964 965

  return TSDB_CODE_SUCCESS;
}

966 967
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
968 969
  int64_t st = taosGetTimestampUs();

970 971
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
972 973
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
974 975 976 977
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

978
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
979
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
980
  ASSERT(pBlockInfo != NULL);
981

H
Hongze Cheng 已提交
982
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
983
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
984 985
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
986
                  ", rows:%d, code:%s %s",
987
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
988 989 990
              tstrerror(code), pReader->idStr);
    return code;
  }
991

992
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
993

994
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
995
                ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
996 997
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
998 999 1000

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
1001

H
Haojun Liao 已提交
1002
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1003
}
H
Hongze Cheng 已提交
1004

H
Haojun Liao 已提交
1005 1006 1007
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1008

H
Haojun Liao 已提交
1009 1010 1011 1012
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1013

H
Haojun Liao 已提交
1014 1015
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1016

H
Haojun Liao 已提交
1017 1018
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1019

H
Haojun Liao 已提交
1020
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1021 1022
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1023

H
Haojun Liao 已提交
1024 1025 1026 1027
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1028

H
Haojun Liao 已提交
1029 1030
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1031

H
Haojun Liao 已提交
1032
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1033
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1034
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1035

H
Haojun Liao 已提交
1036
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1037

H
Haojun Liao 已提交
1038 1039
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1040

H
Haojun Liao 已提交
1041 1042 1043 1044 1045 1046 1047
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1048

1049
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1050
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1051

1052 1053 1054
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1055
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1056 1057 1058
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1059
    if (pScanInfo == NULL) {
1060
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1061 1062 1063 1064
      return TSDB_CODE_INVALID_PARA;
    }

    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
1065
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
1066
  }
1067 1068 1069 1070 1071 1072

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1073
}
H
Hongze Cheng 已提交
1074

1075
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1076
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1077

1078
  pBlockIter->numOfBlocks = numOfBlocks;
1079
  taosArrayClear(pBlockIter->blockList);
1080
  pBlockIter->pTableMap = pReader->status.pTableMap;
1081

1082 1083
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1084

1085
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
1086

1087
  SBlockOrderSupporter sup = {0};
1088
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
1089 1090 1091
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1092

1093 1094 1095 1096 1097 1098 1099
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1100

1101 1102 1103 1104
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1105

1106 1107
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1108

1109 1110 1111 1112 1113
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1114

1115
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
1116
    SDataBlk block = {0};
1117 1118
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1119 1120

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
1121
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
1122

1123
      wrapper.uid = pTableScanInfo->uid;
1124
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1125

1126 1127 1128 1129 1130 1131
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1132

1133
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1134

1135
  // since there is only one table qualified, blocks are not sorted
1136 1137
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1138 1139
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1140
    }
1141

1142
    int64_t et = taosGetTimestampUs();
1143
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1144
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1145

1146
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1147
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1148
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1149
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1150
  }
H
Haojun Liao 已提交
1151

1152 1153
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1154

1155
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1156

1157 1158 1159 1160 1161
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1162
  }
H
Haojun Liao 已提交
1163

1164 1165 1166 1167
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1168

1169 1170
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1171

1172 1173 1174 1175
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1176

1177 1178
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1179
  }
H
Haojun Liao 已提交
1180

1181
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1182 1183
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1184 1185
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1186

1187
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1188
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1189

1190
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1191
}
H
Hongze Cheng 已提交
1192

H
Haojun Liao 已提交
1193
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1194 1195
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1196
  int32_t step = asc ? 1 : -1;
1197
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1198 1199 1200
    return false;
  }

1201
  pBlockIter->index += step;
H
Haojun Liao 已提交
1202
  doSetCurrentBlock(pBlockIter, idStr);
1203

1204 1205 1206
  return true;
}

1207 1208 1209
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1210
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1211 1212
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1213 1214
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1215
}
H
Hongze Cheng 已提交
1216

H
Haojun Liao 已提交
1217
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
H
Hongze Cheng 已提交
1218
                                             int32_t* nextIndex, int32_t order) {
1219
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1220
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1221
    return NULL;
1222 1223
  }

H
Haojun Liao 已提交
1224
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1225 1226 1227
    return NULL;
  }

1228
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1229
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1230

H
Hongze Cheng 已提交
1231 1232
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1233

H
Hongze Cheng 已提交
1234
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1235
  return pBlock;
1236 1237 1238 1239 1240
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1241
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1242 1243
  int32_t index = pBlockIter->index;

1244
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1257
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1258
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1259 1260 1261 1262
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1263 1264 1265 1266 1267
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1268

1269 1270 1271
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1272

H
Haojun Liao 已提交
1273
  doSetCurrentBlock(pBlockIter, "");
1274 1275 1276
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1277
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1278 1279 1280 1281 1282 1283
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1284
}
H
Hongze Cheng 已提交
1285

H
Hongze Cheng 已提交
1286
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1287
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1288

1289
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1290
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1291
}
H
Hongze Cheng 已提交
1292

H
Hongze Cheng 已提交
1293
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1294 1295
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1296 1297
}

H
Hongze Cheng 已提交
1298
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1299 1300 1301 1302 1303
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1304
      if (p->version >= pBlock->minVer) {
1305 1306 1307
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1308
      if (p->version >= pBlock->minVer) {
1309 1310
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1311 1312
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1326
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1327 1328 1329 1330
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1331
  // ts is not overlap
1332
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1333
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1334 1335 1336 1337 1338
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1339 1340 1341 1342
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1343
    while (1) {
1344 1345 1346 1347 1348
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1349 1350 1351
      }
    }

1352 1353
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1354 1355
}

H
Haojun Liao 已提交
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
H
Hongze Cheng 已提交
1369
  int32_t   neighborIndex = 0;
H
Haojun Liao 已提交
1370
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order);
1371

1372
  // overlap with neighbor
1373
  if (pNeighbor) {
H
Haojun Liao 已提交
1374
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1375
    taosMemoryFree(pNeighbor);
1376 1377
  }

1378
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1379 1380
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1381

H
Haojun Liao 已提交
1382 1383 1384
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1385 1386
  }

H
Haojun Liao 已提交
1387 1388 1389 1390
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1391

H
Haojun Liao 已提交
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1406 1407 1408 1409

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
H
Haojun Liao 已提交
1410 1411
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1412 1413 1414
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1415 1416 1417
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1418 1419
}

H
Haojun Liao 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1429
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1430
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1431 1432
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1433

1434 1435 1436
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1437
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1438

1439
  blockDataUpdateTsWindow(pBlock, 0);
1440
  pBlock->info.uid = pBlockScanInfo->uid;
1441

1442
  setComposedBlockFlag(pReader, true);
1443

1444
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1445
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1446
                " - %" PRId64 " %s",
1447 1448
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1449 1450

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1451 1452 1453
  return code;
}

1454 1455
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1456 1457 1458 1459 1460
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1461
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1462 1463

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1464
    if (nextKey != key) {  // merge is not needed
1465
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1466 1467 1468 1469 1470 1471 1472 1473
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1474 1475
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1476 1477 1478 1479 1480 1481 1482 1483
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1484 1485
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1486 1487 1488 1489 1490 1491 1492
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1493
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1508 1509 1510
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1511
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1512 1513
  }

1514
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1515 1516 1517 1518 1519 1520
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1521 1522 1523 1524 1525 1526
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1527 1528 1529 1530 1531 1532
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1533
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1534
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1535
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1536 1537
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1538 1539
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1540
  }
H
Haojun Liao 已提交
1541 1542
}

1543
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1544 1545 1546 1547 1548 1549
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1550
  int64_t tsLast = INT64_MIN;
1551
  if (hasDataInLastBlock(pLastBlockReader)) {
1552 1553
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1554

H
Hongze Cheng 已提交
1555 1556
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1557

1558 1559
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1560
    minKey = INT64_MAX;  // chosen the minimum value
1561
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1562 1563
      minKey = tsLast;
    }
1564

1565 1566 1567
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1568

1569
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1570 1571 1572 1573
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1574
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1575 1576 1577 1578 1579 1580 1581
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1582
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1583 1584
      minKey = key;
    }
1585 1586 1587 1588
  }

  bool init = false;

1589
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1590
  // DESC: mem -----> imem -----> last block -----> file block
1591 1592
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1593
      init = true;
H
Haojun Liao 已提交
1594 1595 1596 1597
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1598
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1599 1600
    }

1601
    if (minKey == tsLast) {
1602
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1603 1604 1605
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1606
        init = true;
H
Haojun Liao 已提交
1607 1608 1609 1610
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1611
      }
1612
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1613
    }
1614

1615
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1616 1617 1618
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1619 1620
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1621 1622 1623 1624 1625 1626 1627 1628
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1629 1630 1631 1632 1633
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1634
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1635 1636 1637 1638 1639 1640
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1641
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1642 1643
        return code;
      }
1644 1645
    }

1646
    if (minKey == tsLast) {
1647
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1648 1649 1650
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1651
        init = true;
H
Haojun Liao 已提交
1652 1653 1654 1655
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1656
      }
1657
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1658 1659 1660
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1661 1662 1663
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1664
        init = true;
H
Haojun Liao 已提交
1665 1666 1667 1668
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1669 1670 1671
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1672 1673
  }

1674 1675 1676 1677 1678
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1679 1680 1681 1682 1683 1684 1685
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1686 1687 1688
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1689
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1690
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1691 1692 1693

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1694
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1695
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1696

1697 1698 1699 1700 1701
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1702 1703 1704 1705
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1706 1707 1708

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1709
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1710

H
Haojun Liao 已提交
1711
      code = tRowMergerGetRow(&merge, &pTSRow);
1712 1713 1714
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1715

1716 1717 1718 1719 1720 1721
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1722 1723 1724 1725 1726
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1727
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1728
    ASSERT(mergeBlockData);
1729 1730

    // merge with block data if ts == key
H
Haojun Liao 已提交
1731
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1732 1733 1734
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1735
    code = tRowMergerGetRow(&merge, &pTSRow);
1736 1737 1738 1739 1740 1741 1742 1743 1744
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1745 1746 1747 1748

  return TSDB_CODE_SUCCESS;
}

1749 1750
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1751 1752
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1753
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1754
    // no last block available, only data block exists
1755
    if (!hasDataInLastBlock(pLastBlockReader)) {
1756 1757 1758 1759 1760 1761 1762 1763 1764
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1765
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1766 1767 1768 1769
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1770

H
Haojun Liao 已提交
1771 1772 1773 1774 1775
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1776
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1777 1778 1779 1780

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1781
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1782

H
Haojun Liao 已提交
1783
        code = tRowMergerGetRow(&merge, &pTSRow);
1784 1785 1786 1787
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1788
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1789

1790 1791
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1792
        return code;
1793
      } else {
1794 1795
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1796
      }
1797
    } else {  // desc order
1798
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1799
    }
1800
  } else {  // only last block exists
1801
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1802
  }
1803 1804
}

1805 1806
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1807 1808 1809
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1810 1811 1812
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1813 1814
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1815 1816
  ASSERT(pRow != NULL && piRow != NULL);

1817
  int64_t tsLast = INT64_MIN;
1818 1819 1820
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1821

H
Hongze Cheng 已提交
1822
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1823 1824 1825 1826

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1827
  int64_t minKey = 0;
1828 1829 1830 1831 1832
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1833

1834 1835 1836
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1837

1838
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1839 1840
      minKey = key;
    }
1841

1842
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1843 1844 1845
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1846
    minKey = INT64_MIN;  // let find the maximum ts value
1847 1848 1849 1850 1851 1852 1853 1854
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1855
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1856 1857 1858
      minKey = key;
    }

1859
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1860 1861
      minKey = tsLast;
    }
1862 1863 1864 1865
  }

  bool init = false;

1866 1867 1868 1869
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1870
      init = true;
1871
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1872
      code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1873 1874 1875 1876
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1877
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1878 1879
    }

1880
    if (minKey == tsLast) {
1881
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1882 1883 1884
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1885
        init = true;
1886
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1887 1888 1889
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1890
      }
H
Haojun Liao 已提交
1891

1892
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1893 1894 1895
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1896 1897 1898
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1899 1900
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1901 1902 1903 1904
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1905
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1906 1907 1908
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1909
      }
H
Haojun Liao 已提交
1910

1911 1912
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1913 1914 1915
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1916 1917
    }

1918
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1919
      if (init) {
1920 1921 1922 1923
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1924 1925
        tRowMerge(&merge, pRow);
      } else {
1926
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1927
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1928 1929 1930
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1931
      }
H
Haojun Liao 已提交
1932 1933 1934 1935 1936
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1937 1938 1939 1940
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1941
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1942
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1943 1944 1945 1946
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
1947 1948 1949 1950 1951
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1952 1953 1954
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1955 1956 1957
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1958 1959
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1960
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1961 1962 1963
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1964
      }
H
Haojun Liao 已提交
1965 1966 1967 1968 1969
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1970 1971 1972
    }

    if (minKey == tsLast) {
1973
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1974 1975 1976
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1977
        init = true;
1978
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1979 1980 1981
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1982
      }
1983
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1984 1985 1986
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1987
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1988
      if (!init) {
1989
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1990 1991 1992
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
1993
      } else {
1994 1995 1996
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
1997
        tRowMerge(&merge, &fRow);
1998 1999
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
2000 2001 2002
    }
  }

2003
  if (merge.pTSchema == NULL) {
2004 2005 2006
    return code;
  }

H
Haojun Liao 已提交
2007
  code = tRowMergerGetRow(&merge, &pTSRow);
2008 2009 2010 2011
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2012 2013 2014 2015
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2016
  return code;
2017 2018
}

2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2044
                  "-%" PRId64 " %s",
2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Haojun Liao 已提交
2065
                  "-%" PRId64 " %s",
2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2083 2084
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2085 2086 2087 2088 2089 2090 2091 2092
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2104
  TSDBKEY k = {.ts = ts, .version = ver};
2105 2106
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2107 2108 2109
    return false;
  }

2110 2111 2112
  return true;
}

2113
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2114
  // the last block reader has been initialized for this table.
2115
  if (pLBlockReader->uid == pScanInfo->uid) {
2116 2117 2118
    return true;
  }

2119 2120
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2121 2122
  }

2123 2124
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2125

H
Hongze Cheng 已提交
2126
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2127 2128 2129
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2130
  } else {
2131
    w.ekey = pScanInfo->lastKey + step;
2132 2133
  }

2134 2135 2136
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2137 2138 2139 2140
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2141
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2142 2143
}

2144
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2145
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2146
  return TSDBROW_TS(&row);
2147 2148
}

H
Hongze Cheng 已提交
2149
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2150 2151 2152 2153

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2154 2155
  }

2156
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2157
}
2158

2159 2160
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2161 2162 2163 2164
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2165 2166
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2167 2168 2169
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2170 2171 2172 2173 2174
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2175
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2176
    code = tRowMergerGetRow(&merge, &pTSRow);
2177 2178 2179 2180
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2181 2182 2183 2184 2185 2186 2187 2188
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2189 2190
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2191 2192
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2193
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2194
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2195
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2196
  } else {
2197 2198 2199 2200 2201 2202 2203 2204 2205
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2206
    // imem + file + last block
2207
    if (pBlockScanInfo->iiter.hasVal) {
2208
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2209 2210
    }

2211
    // mem + file + last block
2212
    if (pBlockScanInfo->iter.hasVal) {
2213
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2214
    }
2215

2216 2217
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2218 2219 2220
  }
}

2221
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2222 2223
  int32_t code = TSDB_CODE_SUCCESS;

2224 2225
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2226
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2227 2228 2229
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2230 2231 2232 2233

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2234 2235
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2236 2237
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2238 2239 2240
      goto _end;
    }

H
Haojun Liao 已提交
2241
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2242
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2243 2244 2245

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2246 2247
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2248 2249 2250
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2251 2252
    }
  } else {  // file blocks not exist
2253 2254 2255
    pBlockScanInfo = pReader->status.pTableIter;
  }

2256
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2257 2258
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2259

2260
  while (1) {
2261
    bool hasBlockData = false;
2262
    {
H
Haojun Liao 已提交
2263
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2264 2265 2266 2267 2268
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2269 2270
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2271
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2272
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2273
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2274 2275 2276
          break;
        }
      }
2277
    }
2278

2279
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2280

2281 2282 2283
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2284 2285
    }

2286
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2287

2288
    // currently loaded file data block is consumed
2289
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2290
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2291
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2292 2293 2294 2295 2296
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2297 2298 2299
    }
  }

H
Haojun Liao 已提交
2300
_end:
2301
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2302 2303
  blockDataUpdateTsWindow(pResBlock, 0);

2304
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2305
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2306 2307 2308

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2309

2310 2311
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
2312
                  " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2313
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2314
              pResBlock->info.rows, el, pReader->idStr);
2315
  }
2316

H
Haojun Liao 已提交
2317
  return code;
2318 2319 2320 2321
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2322 2323
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2324 2325 2326
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2327

2328 2329 2330
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2331 2332
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2333
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2334 2335
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2336
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2337
    if (code != TSDB_CODE_SUCCESS) {
2338 2339 2340 2341 2342
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2343
      tsdbDelFReaderClose(&pDelFReader);
2344 2345 2346
      goto _err;
    }

H
Hongze Cheng 已提交
2347
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2348 2349 2350
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2351 2352
      goto _err;
    }
2353

2354 2355 2356
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2357
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2358
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2359 2360 2361 2362 2363 2364 2365
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2366
    }
2367
  }
2368

2369 2370 2371 2372 2373 2374 2375
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2376 2377
  }

2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2392
  pBlockScanInfo->iter.index =
H
Haojun Liao 已提交
2393
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2394 2395
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2396
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2397 2398
  return code;

H
Haojun Liao 已提交
2399
_err:
2400 2401
  taosArrayDestroy(pDelData);
  return code;
2402 2403
}

H
Haojun Liao 已提交
2404
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2405
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2406
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2407
  if (pRow != NULL) {
2408 2409 2410
    key = TSDBROW_KEY(pRow);
  }

2411
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2412
  if (pRow != NULL) {
2413 2414 2415 2416 2417 2418 2419 2420 2421
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2422
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2423
  SReaderStatus* pStatus = &pReader->status;
2424
  pBlockNum->numOfBlocks = 0;
2425
  pBlockNum->numOfLastFiles = 0;
2426

2427
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2428
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2429 2430

  while (1) {
2431
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2432
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2433 2434 2435
      break;
    }

H
Haojun Liao 已提交
2436
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2437 2438
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2439
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2440 2441 2442
      return code;
    }

H
Hongze Cheng 已提交
2443
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2444
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2445
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2446
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2447 2448 2449
        return code;
      }

2450
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2451 2452 2453
        break;
      }
    }
2454

H
Haojun Liao 已提交
2455 2456 2457
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2458
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2459 2460 2461
  return TSDB_CODE_SUCCESS;
}

2462
static int32_t uidComparFunc(const void* p1, const void* p2) {
2463 2464
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2465 2466 2467
  if (pu1 == pu2) {
    return 0;
  } else {
2468
    return (pu1 < pu2) ? -1 : 1;
2469 2470
  }
}
2471

2472
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2473 2474 2475 2476
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2477
  while (p != NULL) {
2478 2479 2480 2481 2482 2483 2484 2485
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2486
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2487 2488 2489 2490
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2491

2492
  if (pOrderCheckInfo->tableUidList == NULL) {
2493 2494 2495 2496 2497 2498
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2499
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2500 2501 2502
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2503 2504
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2505 2506
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2507 2508

      // the tableMap has already updated
2509
      if (pStatus->pTableIter == NULL) {
2510
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2511 2512 2513 2514 2515 2516 2517 2518 2519
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2520
      }
2521
    }
2522
  }
2523

2524 2525 2526
  return TSDB_CODE_SUCCESS;
}

2527
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2540
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2541
  SReaderStatus*    pStatus = &pReader->status;
2542 2543
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2544 2545
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2546
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2547 2548
    return code;
  }
2549

2550
  while (1) {
2551 2552
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2553
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2554
    if (!hasVal) {
2555 2556
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2557 2558 2559
        return TSDB_CODE_SUCCESS;
      }
      continue;
2560 2561
    }

2562 2563 2564 2565 2566 2567 2568 2569
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2570

2571
    // current table is exhausted, let's try next table
2572 2573
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2574 2575
      return TSDB_CODE_SUCCESS;
    }
2576 2577 2578
  }
}

2579
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2580 2581
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2582 2583 2584

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2585 2586 2587
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2588

2589
  if (pBlockInfo != NULL) {
2590 2591 2592 2593 2594
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2595
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2596
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2597 2598 2599 2600
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2601
  if (pBlockInfo != NULL) {
2602
    pBlock = getCurrentBlock(pBlockIter);
2603 2604
  }

2605
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2606
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2607

2608 2609 2610
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2611
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2612
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2613 2614
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2615 2616 2617
    }

    // build composed data block
2618
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2619
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2620
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2621
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2622
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2623
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2624 2625 2626 2627
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2628
      ASSERT(tsLast >= pBlock->maxKey.ts);
2629 2630
      tBlockDataReset(&pReader->status.fileBlockData);

2631
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2632
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2633
    } else {  // whole block is required, return it directly
2634 2635 2636 2637 2638 2639 2640
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2641 2642 2643 2644 2645
  }

  return code;
}

H
Haojun Liao 已提交
2646
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2647 2648
  SReaderStatus* pStatus = &pReader->status;

2649
  while (1) {
2650 2651 2652
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2653
        return TSDB_CODE_SUCCESS;
2654 2655 2656 2657
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2658
    initMemDataIterator(pBlockScanInfo, pReader);
2659

2660
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2661
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2662 2663 2664 2665
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2666
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2667
      return TSDB_CODE_SUCCESS;
2668 2669 2670 2671 2672
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2673
      return TSDB_CODE_SUCCESS;
2674 2675 2676 2677
    }
  }
}

2678
// set the correct start position in case of the first/last file block, according to the query time window
2679
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2680
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2681

2682 2683 2684
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2685 2686 2687

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2688
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2689 2690
}

2691
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2692 2693
  SBlockNumber num = {0};

2694
  int32_t code = moveToNextFile(pReader, &num);
2695 2696 2697 2698 2699
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2700
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2701 2702 2703 2704 2705
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2706 2707
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2708
  } else {  // no block data, only last block exists
2709
    tBlockDataReset(&pReader->status.fileBlockData);
2710
    resetDataBlockIterator(pBlockIter, pReader->order);
2711
  }
2712 2713

  // set the correct start position according to the query time window
2714
  initBlockDumpInfo(pReader, pBlockIter);
2715 2716 2717
  return code;
}

2718
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2719 2720
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2721 2722
}

2723
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2724
  int32_t code = TSDB_CODE_SUCCESS;
2725 2726
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2727 2728
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2729
  if (pBlockIter->numOfBlocks == 0) {
H
Haojun Liao 已提交
2730
  _begin:
2731 2732 2733 2734 2735
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2736 2737 2738 2739
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2740
    // all data blocks are checked in this last block file, now let's try the next file
2741 2742 2743 2744 2745 2746 2747 2748
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2749
      // this file does not have data files, let's start check the last block file if exists
2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2765
  while (1) {
2766 2767
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2768
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2769
      code = buildComposedDataBlock(pReader);
2770 2771 2772 2773
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2774
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2775 2776
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2777
        } else {
H
Haojun Liao 已提交
2778
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2779 2780 2781 2782 2783 2784
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2785

2786 2787 2788 2789
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2790

2791 2792 2793 2794
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2795
          }
2796
        }
H
Haojun Liao 已提交
2797
      }
2798 2799

      code = doBuildDataBlock(pReader);
2800 2801
    }

2802 2803 2804 2805 2806 2807 2808 2809
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2810
}
H
refact  
Hongze Cheng 已提交
2811

2812 2813
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2814
  if (VND_IS_RSMA(pVnode)) {
2815
    int8_t  level = 0;
2816 2817
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2818
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
2819 2820
                                                                                        : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                                                                   : 1000000L);
2821

2822
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2823 2824 2825 2826 2827 2828 2829
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2830
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2831 2832 2833 2834 2835
        break;
      }
      ++level;
    }

2836
    const char* str = (idStr != NULL) ? idStr : "";
2837 2838

    if (level == TSDB_RETENTION_L0) {
2839
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2840
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2841 2842
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2843
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2844
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2845 2846
      return VND_RSMA1(pVnode);
    } else {
2847
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2848
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2849 2850 2851 2852 2853 2854 2855
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2856
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2857
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2858 2859

  int64_t endVer = 0;
L
Liu Jicong 已提交
2860 2861
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2862 2863
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2864
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2865 2866
  }

H
Haojun Liao 已提交
2867
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2868 2869
}

2870
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2871 2872 2873 2874
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2875 2876 2877
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2878

2879 2880 2881 2882 2883 2884
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2885
        return false;
2886 2887 2888
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2889 2890
      }
    } else {
2891 2892 2893 2894 2895 2896 2897
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2898 2899
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2915 2916
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2917 2918 2919 2920 2921 2922
            return true;
          }
        }
      }

      return false;
2923 2924
    }
  } else {
2925 2926
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2927

2928 2929 2930 2931 2932 2933 2934
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2935
    } else {
2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2963 2964 2965 2966 2967
      }

      return false;
    }
  }
2968 2969

  return false;
2970 2971
}

2972
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2973
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2974 2975
    return NULL;
  }
H
Hongze Cheng 已提交
2976

2977
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2978
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2979
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2980
    pIter->hasVal = false;
H
Haojun Liao 已提交
2981 2982
    return NULL;
  }
H
Hongze Cheng 已提交
2983

2984
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2985
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2986
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
2987 2988
    return pRow;
  }
H
Hongze Cheng 已提交
2989

2990
  while (1) {
2991 2992
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2993 2994
      return NULL;
    }
H
Hongze Cheng 已提交
2995

2996
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2997

H
Haojun Liao 已提交
2998
    key = TSDBROW_KEY(pRow);
2999
    if (outOfTimeWindow(key.ts, &pReader->window)) {
3000
      pIter->hasVal = false;
H
Haojun Liao 已提交
3001 3002
      return NULL;
    }
H
Hongze Cheng 已提交
3003

dengyihao's avatar
dengyihao 已提交
3004
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3005
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3006 3007 3008 3009
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3010

3011 3012
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3013
  while (1) {
3014 3015
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3016 3017
      break;
    }
H
Hongze Cheng 已提交
3018

3019
    // data exists but not valid
3020
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3021 3022 3023 3024 3025
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3026
    TSDBKEY k = TSDBROW_KEY(pRow);
3027
    if (k.ts != ts) {
H
Haojun Liao 已提交
3028 3029 3030
      break;
    }

H
Haojun Liao 已提交
3031
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3032 3033 3034 3035
    if (pTSchema == NULL) {
      return terrno;
    }

3036
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3037 3038 3039 3040 3041
  }

  return TSDB_CODE_SUCCESS;
}

3042
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3043
                                          SVersionRange* pVerRange, int32_t step) {
3044 3045
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3046
      rowIndex += step;
3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3063
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3064 3065
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3066
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3067
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3068

3069
  *state = CHECK_FILEBLOCK_QUIT;
3070
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3071

H
Hongze Cheng 已提交
3072 3073
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
3074
  if (pNeighborBlock == NULL) {  // do nothing
3075 3076 3077 3078
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
3079 3080
  taosMemoryFree(pNeighborBlock);

3081
  if (overlap) {  // load next block
3082
    SReaderStatus*  pStatus = &pReader->status;
3083 3084
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3085
    // 1. find the next neighbor block in the scan block list
3086
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3087
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3088

3089
    // 2. remove it from the scan block list
3090
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3091

3092
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3093
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3094 3095 3096 3097
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3098
    // 4. check the data values
3099 3100 3101 3102
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3103
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3104 3105 3106 3107 3108 3109 3110
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3111 3112
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3113 3114
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3115
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3116
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3117
  int32_t step = asc ? 1 : -1;
3118

3119
  pDumpInfo->rowIndex += step;
3120
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3121 3122 3123
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3124

3125 3126 3127 3128
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3129

3130
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3131
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3132 3133 3134
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3135
      }
3136
    }
H
Haojun Liao 已提交
3137
  }
3138

H
Haojun Liao 已提交
3139 3140 3141
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3142
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3143
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3144
  pScanInfo->lastKey = ts;
3145
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3146 3147
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3148
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3149 3150 3151 3152 3153 3154 3155 3156 3157
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3158 3159
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3160
  TSDBROW* pNextRow = NULL;
3161
  TSDBROW  current = *pRow;
3162

3163 3164
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3165

3166 3167 3168
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3169
      return TSDB_CODE_SUCCESS;
3170
    } else {  // has next point in mem/imem
3171
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3172 3173 3174
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3175
        return TSDB_CODE_SUCCESS;
3176 3177
      }

H
Haojun Liao 已提交
3178
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3179 3180
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3181
        return TSDB_CODE_SUCCESS;
3182
      }
3183
    }
3184 3185
  }

3186 3187
  SRowMerger merge = {0};

3188
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3189
  terrno = 0;
H
Haojun Liao 已提交
3190
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3191 3192 3193
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3194

3195 3196
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3197
  }
H
Haojun Liao 已提交
3198

H
Haojun Liao 已提交
3199 3200 3201 3202
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3203 3204

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3205
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3206 3207 3208
    return terrno;
  }

H
Haojun Liao 已提交
3209 3210
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3211 3212 3213 3214 3215
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3216
  code = tRowMergerGetRow(&merge, pTSRow);
3217 3218 3219
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3220

3221
  tRowMergerClear(&merge);
3222
  *freeTSRow = true;
3223
  return TSDB_CODE_SUCCESS;
3224 3225
}

3226
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3227
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3228 3229
  SRowMerger merge = {0};

3230 3231 3232
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3233
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3234
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3235

H
Haojun Liao 已提交
3236 3237 3238 3239 3240 3241 3242 3243 3244 3245
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3246

3247
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3248 3249 3250 3251 3252 3253
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3254
  } else {
H
Haojun Liao 已提交
3255
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3256

H
Haojun Liao 已提交
3257
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3258
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3259 3260 3261 3262 3263 3264 3265 3266
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3267 3268

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3269 3270 3271 3272 3273
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3274
  }
3275

3276 3277
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3278 3279
}

3280 3281
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3282 3283
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3284
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3285
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3286

3287 3288
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3289
  if (pBlockScanInfo->iter.hasVal) {
3290 3291 3292 3293 3294 3295
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3296
  if (pBlockScanInfo->iiter.hasVal) {
3297 3298 3299 3300 3301 3302
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3303
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3304
    TSDBKEY k = TSDBROW_KEY(pRow);
3305
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3306

3307
    int32_t code = TSDB_CODE_SUCCESS;
3308 3309
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3310
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3311
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3312
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3313
      }
3314
    } else {  // ik.ts == k.ts
3315
      *freeTSRow = true;
3316 3317 3318 3319
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3320
    }
3321

3322
    return code;
H
Haojun Liao 已提交
3323 3324
  }

3325
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3326 3327
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3328 3329
  }

3330
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3331
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3332 3333 3334 3335 3336
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3337
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3338 3339 3340
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3341
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3342
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3343

3344
  SColVal colVal = {0};
3345
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3346

3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3358
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3359 3360 3361 3362 3363 3364 3365 3366
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3367
    }
3368 3369
  }

3370
  // set null value since current column does not exist in the "pSchema"
3371
  while (i < numOfCols) {
3372 3373 3374 3375 3376
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3377 3378 3379 3380
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3381 3382
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3383 3384 3385 3386 3387 3388 3389 3390
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3391
    i += 1;
3392 3393 3394
  }

  SColVal cv = {0};
3395 3396
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3397

3398
  while (i < numOfOutputCols && j < numOfInputCols) {
3399
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3400
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3401

3402 3403 3404 3405 3406
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3407
    if (pData->cid == pCol->info.colId) {
3408 3409
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3410
      j += 1;
H
Haojun Liao 已提交
3411 3412
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3413 3414 3415 3416 3417 3418 3419 3420
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3421
    colDataAppendNULL(pCol, outputRowIndex);
3422 3423 3424 3425 3426 3427 3428
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3429 3430
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3431 3432 3433 3434
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3435
    bool    freeTSRow = false;
3436
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3437 3438
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3439 3440
    }

H
Haojun Liao 已提交
3441
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3442 3443 3444
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3445 3446

    // no data in buffer, return immediately
3447
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3448 3449 3450
      break;
    }

3451
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3452 3453 3454 3455
      break;
    }
  } while (1);

3456
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3457 3458
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3459

3460 3461
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3462
  ASSERT(pReader != NULL);
3463 3464 3465 3466 3467 3468

  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
  }

3469 3470
  taosHashClear(pReader->status.pTableMap);

3471 3472 3473 3474 3475 3476
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
    taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
  }

H
Hongze Cheng 已提交
3477 3478 3479
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3480 3481 3482 3483 3484 3485
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3486

dengyihao's avatar
dengyihao 已提交
3487 3488 3489 3490 3491 3492
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3493

H
Hongze Cheng 已提交
3494
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3495

3496 3497 3498 3499 3500
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3501
//  resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey);
3502 3503 3504 3505 3506 3507 3508 3509 3510 3511

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3512
// ====================================== EXPOSED APIs ======================================
3513 3514
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3515 3516 3517 3518 3519 3520
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3521 3522
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3523 3524
    goto _err;
  }
H
Hongze Cheng 已提交
3525

3526
  // check for query time window
H
Haojun Liao 已提交
3527
  STsdbReader* pReader = *ppReader;
3528
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3529 3530 3531
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3532

3533 3534
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3535
    int32_t order = pCond->order;
3536
    if (order == TSDB_ORDER_ASC) {
3537
      pCond->twindows.ekey = window.skey;
3538 3539 3540
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3541
      pCond->twindows.skey = window.ekey;
3542 3543 3544 3545
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3546
    // here we only need one more row, so the capacity is set to be ONE.
3547 3548 3549 3550 3551 3552
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3553
      pCond->twindows.skey = window.ekey;
3554
      pCond->twindows.ekey = INT64_MAX;
3555
    } else {
3556
      pCond->twindows.skey = INT64_MIN;
3557
      pCond->twindows.ekey = window.ekey;
3558
    }
3559 3560
    pCond->order = order;

3561 3562 3563 3564 3565 3566
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3567
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3568
  if (pCond->suid != 0) {
3569
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3570
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3571
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3572
    }
3573 3574
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3575
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3576
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3577
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3578
    }
3579 3580
  }

H
Hongze Cheng 已提交
3581
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3582

3583
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3584 3585 3586
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3587

H
Haojun Liao 已提交
3588 3589 3590
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3591

3592
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3593
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3594 3595 3596
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3597

3598
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3599 3600 3601
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3602
      }
3603
    } else {
H
Haojun Liao 已提交
3604 3605
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3606

H
Haojun Liao 已提交
3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3619

H
Haojun Liao 已提交
3620
      code = doOpenReaderImpl(pPrevReader);
3621
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3622
        return code;
3623
      }
3624 3625 3626
    }
  }

3627
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3628
  return code;
H
Hongze Cheng 已提交
3629

3630
  _err:
H
Haojun Liao 已提交
3631
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3632
  return code;
H
refact  
Hongze Cheng 已提交
3633 3634 3635
}

void tsdbReaderClose(STsdbReader* pReader) {
3636 3637
  if (pReader == NULL) {
    return;
3638
  }
H
refact  
Hongze Cheng 已提交
3639

3640 3641
  {
    if (pReader->innerReader[0] != NULL) {
3642
      STsdbReader* p = pReader->innerReader[0];
3643

3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3655 3656 3657 3658 3659 3660

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3661
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3662

3663 3664 3665 3666
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3667
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3668 3669 3670 3671
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3672

3673
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3674
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3675 3676

  cleanupDataBlockIterator(&pReader->status.blockIter);
3677 3678

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3679
  destroyBlockScanInfo(pReader->status.pTableMap);
3680
  blockDataDestroy(pReader->pResBlock);
3681

H
Haojun Liao 已提交
3682 3683 3684
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3685

H
Haojun Liao 已提交
3686
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3687

3688
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3689
  SIOCostSummary* pCost = &pReader->cost;
3690

H
Haojun Liao 已提交
3691 3692
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3693 3694
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3695

H
Haojun Liao 已提交
3696 3697 3698 3699 3700
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3701

H
Haojun Liao 已提交
3702 3703
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
H
Hongze Cheng 已提交
3704 3705
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
H
Haojun Liao 已提交
3706 3707 3708 3709 3710
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3711

3712 3713
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3714 3715 3716
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3717
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3718 3719
}

3720
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3721
  // cleanup the data that belongs to the previous data block
3722 3723
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3724

3725
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3726

3727 3728 3729 3730 3731
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3732

3733 3734 3735
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3736
      buildBlockFromBufferSequentially(pReader);
3737
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3738
    }
3739 3740 3741
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3742
  }
3743

3744
  return false;
H
refact  
Hongze Cheng 已提交
3745 3746
}

3747 3748 3749 3750 3751
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3752
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3753
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3754
    pReader->step = EXTERNAL_ROWS_PREV;
3755 3756 3757
    if (ret) {
      return ret;
    }
3758
  }
3759

3760
  if (pReader->step == EXTERNAL_ROWS_PREV) {
3761 3762 3763 3764 3765 3766 3767 3768
    // prepare for the main scan
    int32_t code = doOpenReaderImpl(pReader);
    resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);

    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3769
    pReader->step = EXTERNAL_ROWS_MAIN;
3770 3771 3772 3773 3774 3775 3776
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3777
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
3778 3779
    // prepare for the next row scan
    int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
3780
    resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3781 3782 3783 3784
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3785
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3786
    pReader->step = EXTERNAL_ROWS_NEXT;
3787 3788 3789 3790 3791 3792 3793 3794
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3795
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Haojun Liao 已提交
3796
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3797
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3798 3799 3800
    return false;
  }

H
Haojun Liao 已提交
3801
  return true;
3802 3803
}

3804
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3805 3806 3807 3808
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3809 3810
}

3811 3812
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3813
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3814
      setBlockInfo(pReader, pDataBlockInfo);
3815
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3816 3817 3818 3819 3820 3821 3822 3823 3824
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3825
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3826
  int32_t code = 0;
3827
  *allHave = false;
H
Hongze Cheng 已提交
3828

3829
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3830 3831 3832 3833
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3834
  // there is no statistics data for composed block
3835 3836 3837 3838
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3839

3840
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3841

H
Hongze Cheng 已提交
3842 3843
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3844

3845 3846
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3847
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3848
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3849
    if (code != TSDB_CODE_SUCCESS) {
3850 3851
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3852 3853
      return code;
    }
3854 3855 3856
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3857
  }
H
Hongze Cheng 已提交
3858

3859
  *allHave = true;
H
Hongze Cheng 已提交
3860

3861 3862
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3863

3864 3865
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3882 3883
      i += 1;
      j += 1;
3884 3885 3886 3887 3888 3889 3890
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3891
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3892
  pReader->cost.smaLoadTime += elapsed;
3893
  pReader->cost.smaDataLoad += 1;
3894 3895 3896

  *pBlockStatis = pSup->plist;

3897
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3898 3899
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3900
  return code;
H
Hongze Cheng 已提交
3901 3902
}

3903
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3904 3905 3906
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3907
    return pReader->pResBlock->pDataBlock;
3908
  }
3909

H
Haojun Liao 已提交
3910 3911 3912 3913 3914 3915 3916 3917
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3918

3919
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3920
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3921
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3922 3923
    terrno = code;
    return NULL;
3924
  }
3925 3926 3927

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3928 3929
}

3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3942
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
3943
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
3944 3945
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3946

L
Liu Jicong 已提交
3947
  pReader->order = pCond->order;
3948
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3949
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3950
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3951
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3952

3953
  // allocate buffer in order to load data blocks from file
3954
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3955 3956
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3957
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3958
  tsdbDataFReaderClose(&pReader->pFileReader);
3959

3960
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3961

3962
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3963
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3964

H
Hongze Cheng 已提交
3965
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
3966
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3967

3968
  int32_t         code = 0;
3969 3970
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3971 3972 3973 3974 3975 3976
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3977 3978
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3979 3980 3981
      return code;
    }
  }
H
Hongze Cheng 已提交
3982

H
Hongze Cheng 已提交
3983
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
3984
                " in query %s",
H
Hongze Cheng 已提交
3985 3986
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
3987

3988
  return code;
H
Hongze Cheng 已提交
3989
}
H
Hongze Cheng 已提交
3990

3991 3992 3993
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3994

3995 3996 3997 3998
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3999

4000 4001
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
4002

4003 4004 4005
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4006

4007
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4008

4009
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4010

4011 4012
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4013

4014 4015
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4016

4017 4018
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4019
  }
H
Hongze Cheng 已提交
4020

4021
  pTableBlockInfo->numOfTables = numOfTables;
4022
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4023

4024 4025
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4026
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4027

4028 4029
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4030

4031 4032 4033
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4034

4035 4036 4037
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4038

4039 4040 4041
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4042

4043 4044
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4045

H
Haojun Liao 已提交
4046
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4047 4048 4049 4050 4051
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4052

4053 4054
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4055
    }
H
refact  
Hongze Cheng 已提交
4056

H
Hongze Cheng 已提交
4057 4058
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4059
  }
H
Hongze Cheng 已提交
4060

H
refact  
Hongze Cheng 已提交
4061 4062
  return code;
}
H
Hongze Cheng 已提交
4063

H
refact  
Hongze Cheng 已提交
4064
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4065
  int64_t rows = 0;
H
Hongze Cheng 已提交
4066

4067 4068
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4069

4070 4071 4072 4073 4074
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4075
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4076 4077 4078 4079 4080 4081 4082
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4083
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4084 4085 4086 4087 4088 4089 4090 4091
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4092

H
refact  
Hongze Cheng 已提交
4093
  return rows;
H
Hongze Cheng 已提交
4094
}
D
dapan1121 已提交
4095

L
Liu Jicong 已提交
4096
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4109

D
dapan1121 已提交
4110
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4111
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4126
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4127

D
dapan1121 已提交
4128 4129
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4130

H
Haojun Liao 已提交
4131
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4160
  // fs
H
Hongze Cheng 已提交
4161 4162 4163 4164 4165
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4166 4167 4168 4169 4170 4171 4172 4173

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4174
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
4175
  _exit:
H
Hongze Cheng 已提交
4176 4177 4178
  return code;
}

H
Haojun Liao 已提交
4179
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4180 4181 4182 4183 4184 4185 4186 4187 4188
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4189
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4190
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4191
  }
H
Haojun Liao 已提交
4192 4193
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}