tsdbRead.c 137.8 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18

H
Hongze Cheng 已提交
19
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
20

21 22 23 24 25 26
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

27
typedef struct {
dengyihao's avatar
dengyihao 已提交
28
  STbDataIter* iter;
29 30 31 32
  int32_t      index;
  bool         hasVal;
} SIterInfo;

33 34
typedef struct {
  int32_t numOfBlocks;
35
  int32_t numOfLastFiles;
36 37
} SBlockNumber;

H
Haojun Liao 已提交
38
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
39 40
  uint64_t  uid;
  TSKEY     lastKey;
H
Hongze Cheng 已提交
41 42 43 44 45 46 47 48
  SMapData  mapData;            // block info (compressed)
  SArray*   pBlockList;         // block data index list
  SIterInfo iter;               // mem buffer skip list iterator
  SIterInfo iiter;              // imem buffer skip list iterator
  SArray*   delSkyline;         // delete info for this table
  int32_t   fileDelIndex;       // file block delete index
  int32_t   lastBlockDelIndex;  // delete index for last block
  bool      iterInit;           // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
49 50 51
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
52
  int64_t uid;
53
  int64_t offset;
H
Haojun Liao 已提交
54
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
55 56

typedef struct SBlockOrderSupporter {
57 58 59 60
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
61 62 63
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
64 65 66
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
67
  int64_t headFileLoad;
68
  double  headFileLoadTime;
69
  int64_t smaDataLoad;
70
  double  smaLoadTime;
71 72
  int64_t lastBlockLoad;
  double  lastBlockLoadTime;
H
Haojun Liao 已提交
73 74
  int64_t composedBlocks;
  double  buildComposedBlockTime;
H
Hongze Cheng 已提交
75 76 77
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
78
  SArray*          pColAgg;
79
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
80
  SColumnDataAgg** plist;
81
  int16_t*         colIds;  // column ids for loading file block data
82
  int32_t          numOfCols;
83
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
84 85
} SBlockLoadSuppInfo;

86
typedef struct SLastBlockReader {
H
Hongze Cheng 已提交
87 88 89 90 91
  STimeWindow        window;
  SVersionRange      verRange;
  int32_t            order;
  uint64_t           uid;
  SMergeTree         mergeTree;
92
  SSttBlockLoadInfo* pInfo;
93 94
} SLastBlockReader;

95
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
96 97 98
  int32_t           numOfFiles;  // number of total files
  int32_t           index;       // current accessed index in the list
  SArray*           pFileList;   // data file list
99
  int32_t           order;
H
Hongze Cheng 已提交
100
  SLastBlockReader* pLastBlockReader;  // last file block reader
101
} SFilesetIter;
H
Haojun Liao 已提交
102 103

typedef struct SFileDataBlockInfo {
104
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
105
  uint64_t uid;
106
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
107 108 109
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
110
  int32_t   numOfBlocks;
111
  int32_t   index;
H
Hongze Cheng 已提交
112
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
113
  int32_t   order;
114
  SDataBlk  block;  // current SDataBlk data
115
  SHashObj* pTableMap;
H
Haojun Liao 已提交
116 117 118
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
119 120 121 122
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
123 124
} SFileBlockDumpInfo;

125
typedef struct SUidOrderCheckInfo {
126 127
  uint64_t* tableUidList;  // access table uid list in uid ascending order list
  int32_t   currentIndex;  // index in table uid list
128 129
} SUidOrderCheckInfo;

H
Haojun Liao 已提交
130
typedef struct SReaderStatus {
131
  bool                 loadFromFile;       // check file stage
132
  bool                 composedDataBlock;  // the returned data block is a composed block or not
133 134
  SHashObj*            pTableMap;          // SHash<STableBlockScanInfo>
  STableBlockScanInfo* pTableIter;         // table iterator used in building in-memory buffer data blocks.
135
  SUidOrderCheckInfo   uidCheckInfo;       // check all table in uid order
136
  SFileBlockDumpInfo   fBlockDumpInfo;
137 138 139 140
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
H
Haojun Liao 已提交
141 142
} SReaderStatus;

H
Hongze Cheng 已提交
143
struct STsdbReader {
H
Haojun Liao 已提交
144 145 146 147 148 149 150
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
151 152
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
153
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
154
  STsdbReadSnap*     pReadSnap;
155
  SIOCostSummary     cost;
156 157
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
158 159
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
160

161 162
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
163
};
H
Hongze Cheng 已提交
164

H
Haojun Liao 已提交
165
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
166 167
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
168
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
169 170
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
H
Hongze Cheng 已提交
171
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
172
                                       SRowMerger* pMerger, SVersionRange* pVerRange);
173
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
174
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
175
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
176
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
H
Hongze Cheng 已提交
177
                                         int32_t rowIndex);
178
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
179
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
180

H
Hongze Cheng 已提交
181 182 183 184
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
                                        STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
                                  STsdbReader* pReader, STSRow** pTSRow);
185 186
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                                     STsdbReader* pReader);
187

dengyihao's avatar
dengyihao 已提交
188 189 190 191
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
192
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Hongze Cheng 已提交
193 194 195
static int64_t       getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t       doBuildDataBlock(STsdbReader* pReader);
H
Haojun Liao 已提交
196
static TSDBKEY       getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
197 198
static bool          hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool          hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
199

200 201
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }

202 203 204
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

205
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
206

207
  pSupInfo->numOfCols = numOfCols;
208
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
209
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
210 211 212
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
213 214
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
215

H
Haojun Liao 已提交
216 217
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
218
    pSupInfo->colIds[i] = pCol->info.colId;
219 220 221 222

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
223
  }
H
Hongze Cheng 已提交
224

H
Haojun Liao 已提交
225 226
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
227

228
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
229
  // allocate buffer in order to load data blocks from file
230
  // todo use simple hash instead, optimize the memory consumption
231 232 233
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
234 235 236
    return NULL;
  }

237
  for (int32_t j = 0; j < numOfTables; ++j) {
238
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
239
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
H
Haojun Liao 已提交
240
      int64_t skey = pTsdbReader->window.skey;
H
Hongze Cheng 已提交
241
      info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
wmmhello's avatar
wmmhello 已提交
242
    } else {
H
Haojun Liao 已提交
243
      int64_t ekey = pTsdbReader->window.ekey;
H
Hongze Cheng 已提交
244
      info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
H
Haojun Liao 已提交
245
    }
wmmhello's avatar
wmmhello 已提交
246

247 248 249
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
250 251
  }

252 253
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
254

255
  return pTableMap;
H
Hongze Cheng 已提交
256
}
H
Hongze Cheng 已提交
257

H
Haojun Liao 已提交
258
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
259 260
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
261
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
262 263
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
264
    if (p->iter.iter != NULL) {
265
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
266 267
    }

268
    p->delSkyline = taosArrayDestroy(p->delSkyline);
H
Haojun Liao 已提交
269
    p->lastKey = ts;
270 271 272
  }
}

273 274 275
static void clearBlockScanInfo(STableBlockScanInfo* p) {
  p->iterInit = false;
  p->iiter.hasVal = false;
276

277 278 279
  if (p->iter.iter != NULL) {
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
  }
280

281 282 283
  if (p->iiter.iter != NULL) {
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
  }
284

285 286 287 288
  p->delSkyline = taosArrayDestroy(p->delSkyline);
  p->pBlockList = taosArrayDestroy(p->pBlockList);
  tMapDataClear(&p->mapData);
}
289

290 291 292 293
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
294 295 296 297 298
  }

  taosHashCleanup(pTableMap);
}

299
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
300 301
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
302
}
H
Hongze Cheng 已提交
303

304 305 306
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
307
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
308

309
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
310
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
311

dengyihao's avatar
dengyihao 已提交
312
  STimeWindow win = *pWindow;
313 314 315 316 317 318
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
319

H
Haojun Liao 已提交
320
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
321 322 323 324 325 326
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
327 328 329
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
330 331 332 333
  }
}

// init file iterator
334
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
H
Hongze Cheng 已提交
335
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
336

337 338
  pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
  pIter->order = pReader->order;
H
Hongze Cheng 已提交
339
  pIter->pFileList = aDFileSet;
340
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
341

342 343 344 345
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
346
      tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
347 348
      return code;
    }
349 350
  }

351 352 353 354 355 356 357 358
  SLastBlockReader* pLReader = pIter->pLastBlockReader;
  pLReader->order = pReader->order;
  pLReader->window = pReader->window;
  pLReader->verRange = pReader->verRange;

  pLReader->uid = 0;
  tMergeTreeClose(&pLReader->mergeTree);

359
  if (pLReader->pInfo == NULL) {
360
    // here we ignore the first column, which is always be the primary timestamp column
361 362
    pLReader->pInfo =
        tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
H
Haojun Liao 已提交
363 364 365 366
    if (pLReader->pInfo == NULL) {
      tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
      return terrno;
    }
367 368
  }

369
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
H
Haojun Liao 已提交
370 371 372
  return TSDB_CODE_SUCCESS;
}

373
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
374 375
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
376 377 378
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
379 380 381
    return false;
  }

H
Haojun Liao 已提交
382 383 384
  SIOCostSummary* pSum = &pReader->cost;
  getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime);

385 386
  pIter->pLastBlockReader->uid = 0;
  tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
387
  resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
388

H
Haojun Liao 已提交
389 390
  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
391

392
  while (1) {
H
Haojun Liao 已提交
393 394 395
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
396

397
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
398

399 400 401 402
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
403

404 405
    pReader->cost.headFileLoad += 1;

406 407 408 409 410 411 412 413 414 415 416 417
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
418 419 420
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
421 422
      continue;
    }
C
Cary Xu 已提交
423

424
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
425
              pReader->window.ekey, pReader->idStr);
426 427
    return true;
  }
428

429
_err:
H
Haojun Liao 已提交
430 431 432
  return false;
}

433
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
434 435
  pIter->order = order;
  pIter->index = -1;
436
  pIter->numOfBlocks = 0;
437 438 439 440 441 442 443
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
444
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
445

H
Haojun Liao 已提交
446
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
447 448
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
449 450
}

451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

474 475
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
476
  int32_t      code = 0;
477
  int8_t       level = 0;
H
Haojun Liao 已提交
478
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
479 480
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
481
    goto _end;
H
Hongze Cheng 已提交
482 483
  }

C
Cary Xu 已提交
484 485 486 487
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
488
  initReaderStatus(&pReader->status);
489

L
Liu Jicong 已提交
490
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
491 492
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
493
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
494 495
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
496
  pReader->type = pCond->type;
497
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
498

499
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
500

501
  limitOutputBufferSize(pCond, &pReader->capacity);
502

503 504
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
505
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
506
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
507
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
508 509 510
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
511

512 513
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
514
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
515 516 517 518 519
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

520 521 522 523
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
524
  }
H
Hongze Cheng 已提交
525

526 527
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
528 529
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
530

H
Haojun Liao 已提交
531 532
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
533 534 535
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
536

H
Haojun Liao 已提交
537
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
538
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
539

540
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
541
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
542
  if (code != TSDB_CODE_SUCCESS) {
543
    goto _end;
H
Haojun Liao 已提交
544
  }
H
Hongze Cheng 已提交
545

546 547
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Haojun Liao 已提交
548
    taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
549 550
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
551

552 553 554 555
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
556
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
557

558
    // uid check
H
Hongze Cheng 已提交
559
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
560 561 562 563
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
564
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
565 566 567 568 569 570
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
571
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
572 573
    }

H
Hongze Cheng 已提交
574
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
575
  }
H
Hongze Cheng 已提交
576

577
  int64_t et2 = taosGetTimestampUs();
578
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
579
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
580 581 582

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

583
_end:
H
Hongze Cheng 已提交
584
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
585 586
  return code;
}
H
Hongze Cheng 已提交
587

588
static void cleanupTableScanInfo(SHashObj* pTableMap) {
589
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
590
  while (1) {
591
    px = taosHashIterate(pTableMap, px);
592 593 594 595
    if (px == NULL) {
      break;
    }

596
    // reset the index in last block when handing a new file
597
    tMapDataClear(&px->mapData);
598 599
    taosArrayClear(px->pBlockList);
  }
600 601
}

602
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
603 604 605 606 607 608
  int32_t numOfQTable = 0;
  size_t  sizeInDisk = 0;
  size_t  numOfTables = taosArrayGetSize(pIndexList);

  int64_t st = taosGetTimestampUs();
  cleanupTableScanInfo(pReader->status.pTableMap);
609

dengyihao's avatar
dengyihao 已提交
610
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
611
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
612

613
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
614

615
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
616
    tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
617

618
    sizeInDisk += pScanInfo->mapData.nData;
619
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Hongze Cheng 已提交
620 621
      SDataBlk block = {0};
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
H
Hongze Cheng 已提交
622

623
      // 1. time range check
624
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
625 626
        continue;
      }
H
Hongze Cheng 已提交
627

628
      // 2. version range check
H
Hongze Cheng 已提交
629
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
630 631
        continue;
      }
632

633
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
634
      if (p == NULL) {
635
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
636 637
        return TSDB_CODE_OUT_OF_MEMORY;
      }
638

639
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
640
    }
H
Hongze Cheng 已提交
641

H
Haojun Liao 已提交
642
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
643 644 645 646
      numOfQTable += 1;
    }
  }

H
Hongze Cheng 已提交
647
  pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
648
  int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
649

650
  double el = (taosGetTimestampUs() - st) / 1000.0;
651 652 653 654 655
  tsdbDebug(
      "load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
      "time:%.2f ms %s",
      numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
      pReader->idStr);
656

657
  pReader->cost.numOfBlocks += total;
658
  pReader->cost.headFileLoadTime += el;
659

H
Haojun Liao 已提交
660 661
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
662

663
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
664
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
665
  pDumpInfo->allDumped = true;
666
  pDumpInfo->lastKey = maxKey + step;
H
Haojun Liao 已提交
667 668
}

669 670
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
671
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
H
Hongze Cheng 已提交
672
    if (!COL_VAL_IS_VALUE(pColVal)) {
H
Haojun Liao 已提交
673 674 675
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
676
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
677 678 679 680
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
H
Hongze Cheng 已提交
681
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
H
Haojun Liao 已提交
682
  }
H
Haojun Liao 已提交
683 684
}

685
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
686 687
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
688 689
    return NULL;
  }
690 691 692

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
693 694
}

H
Hongze Cheng 已提交
695
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
696

H
Haojun Liao 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
  int32_t midPos = -1;
  int32_t numOfRows;

  ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
  // start end position
  int s, e;
  s = pos;

  // check
H
Hongze Cheng 已提交
770
  assert(pos >= 0 && pos < num);
H
Haojun Liao 已提交
771 772 773 774
  assert(num > 0);

  if (order == TSDB_ORDER_ASC) {
    // find the first position which is smaller than the key
H
Hongze Cheng 已提交
775 776
    e = num - 1;
    if (key < keyList[pos]) return -1;
H
Haojun Liao 已提交
777 778
    while (1) {
      // check can return
H
Hongze Cheng 已提交
779 780 781
      if (key >= keyList[e]) return e;
      if (key <= keyList[s]) return s;
      if (e - s <= 1) return s;
H
Haojun Liao 已提交
782 783

      // change start or end position
H
Hongze Cheng 已提交
784
      int mid = s + (e - s + 1) / 2;
H
Haojun Liao 已提交
785 786
      if (keyList[mid] > key)
        e = mid;
H
Hongze Cheng 已提交
787
      else if (keyList[mid] < key)
H
Haojun Liao 已提交
788 789 790 791
        s = mid;
      else
        return mid;
    }
H
Hongze Cheng 已提交
792
  } else {  // DESC
H
Haojun Liao 已提交
793
    // find the first position which is bigger than the key
H
Hongze Cheng 已提交
794 795
    e = 0;
    if (key > keyList[pos]) return -1;
H
Haojun Liao 已提交
796 797
    while (1) {
      // check can return
H
Hongze Cheng 已提交
798 799 800
      if (key <= keyList[e]) return e;
      if (key >= keyList[s]) return s;
      if (s - e <= 1) return s;
H
Haojun Liao 已提交
801 802

      // change start or end position
H
Hongze Cheng 已提交
803
      int mid = s - (s - e + 1) / 2;
H
Haojun Liao 已提交
804 805
      if (keyList[mid] < key)
        e = mid;
H
Hongze Cheng 已提交
806
      else if (keyList[mid] > key)
H
Haojun Liao 已提交
807 808 809 810 811 812 813 814 815 816
        s = mid;
      else
        return mid;
    }
  }
}

int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
H
Hongze Cheng 已提交
817
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
818 819 820 821 822 823 824 825 826 827 828 829

  if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
    endPos = pBlock->nRow - 1;
  } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
    endPos = 0;
  } else {
    endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order);
  }

  return endPos;
}

830
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
831
  SReaderStatus*  pStatus = &pReader->status;
832
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
833

834
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
835
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
H
Hongze Cheng 已提交
836
  SDataBlk*           pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
837
  SSDataBlock*        pResBlock = pReader->pResBlock;
838
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
839

H
Haojun Liao 已提交
840
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
841
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
842

H
Haojun Liao 已提交
843
  SColVal cv = {0};
844
  int64_t st = taosGetTimestampUs();
845 846
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
847

848 849
  if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
    if (asc && pReader->window.skey <= pBlock->minKey.ts) {
850 851 852
      // pDumpInfo->rowIndex = 0;
    } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
      // pDumpInfo->rowIndex = pBlock->nRow - 1;
853 854 855 856 857
    } else {
      int32_t pos = asc ? pBlock->nRow - 1 : 0;
      int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
      pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
    }
H
Haojun Liao 已提交
858 859 860
  }

  // time window check
861 862 863 864 865 866 867
  int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
  if (endIndex == -1) {
    setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
    return TSDB_CODE_SUCCESS;
  }

  endIndex += step;
H
Haojun Liao 已提交
868
  int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
H
Hongze Cheng 已提交
869
  if (remain > pReader->capacity) {  // output buffer check
870 871 872
    remain = pReader->capacity;
  }

H
Haojun Liao 已提交
873 874
  int32_t rowIndex = 0;

H
Hongze Cheng 已提交
875
  int32_t          i = 0;
876 877
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
878
    if (asc) {
H
Haojun Liao 已提交
879
      memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
H
Haojun Liao 已提交
880 881 882 883
    } else {
      for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
        colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
      }
884
    }
H
Haojun Liao 已提交
885

886 887 888
    i += 1;
  }

889 890 891
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
892 893 894
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
895
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
896 897 898
    if (pData->cid < pColData->info.colId) {
      colIndex += 1;
    } else if (pData->cid == pColData->info.colId) {
H
Hongze Cheng 已提交
899
      if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
H
Haojun Liao 已提交
900 901 902 903
        colDataAppendNNULL(pColData, 0, remain);
      } else {
        if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
          uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
H
Haojun Liao 已提交
904
          memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
H
Haojun Liao 已提交
905 906 907 908

          // null value exists, check one-by-one
          if (pData->flag != HAS_VALUE) {
            for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
H
Haojun Liao 已提交
909
              uint8_t v = tColDataGetBitValue(pData, j);
H
Haojun Liao 已提交
910 911 912 913 914 915 916 917 918 919 920
              if (v == 0 || v == 1) {
                colDataSetNull_f(pColData->nullbitmap, rowIndex);
              }
            }
          }
        } else {
          for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
            tColDataGetValue(pData, j, &cv);
            doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
          }
        }
H
Haojun Liao 已提交
921
      }
H
Haojun Liao 已提交
922

923
      colIndex += 1;
924
      i += 1;
925 926
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
927
      i += 1;
H
Haojun Liao 已提交
928
    }
929 930
  }

931
  // fill the mis-matched columns with null value
932
  while (i < numOfOutputCols) {
933 934 935
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
936
  }
H
Haojun Liao 已提交
937

938
  pResBlock->info.rows = remain;
939
  pDumpInfo->rowIndex += step * remain;
940

941
  // check if current block are all handled
942
  if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
943 944 945 946
    int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
    if (outOfTimeWindow(ts, &pReader->window)) {  // the remain data has out of query time window, ignore current block
      setBlockAllDumped(pDumpInfo, ts, pReader->order);
    }
947
  } else {
948 949
    int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
    setBlockAllDumped(pDumpInfo, ts, pReader->order);
H
Haojun Liao 已提交
950
  }
H
Haojun Liao 已提交
951

952
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
953
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
954

955
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
956
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
957
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
H
Hongze Cheng 已提交
958 959
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
            unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
960 961 962 963

  return TSDB_CODE_SUCCESS;
}

964 965
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
                                   uint64_t uid) {
966 967
  int64_t st = taosGetTimestampUs();

968 969
  tBlockDataReset(pBlockData);
  TABLEID tid = {.suid = pReader->suid, .uid = uid};
970 971
  int32_t code =
      tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
972 973 974 975
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

976
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
977
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
978
  ASSERT(pBlockInfo != NULL);
979

H
Hongze Cheng 已提交
980
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
981
  code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
982 983 984
  if (code != TSDB_CODE_SUCCESS) {
    tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
              ", rows:%d, code:%s %s",
985
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
986 987 988
              tstrerror(code), pReader->idStr);
    return code;
  }
989

990
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
991

992 993 994 995
  tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
            pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
996 997 998

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
999

H
Haojun Liao 已提交
1000
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1001
}
H
Hongze Cheng 已提交
1002

H
Haojun Liao 已提交
1003 1004 1005
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1006

H
Haojun Liao 已提交
1007 1008 1009 1010
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1011

H
Haojun Liao 已提交
1012 1013
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1014

H
Haojun Liao 已提交
1015 1016
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1017

H
Haojun Liao 已提交
1018
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1019 1020
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1021

H
Haojun Liao 已提交
1022 1023 1024 1025
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1026

H
Haojun Liao 已提交
1027 1028
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1029

H
Haojun Liao 已提交
1030
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1031
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1032
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1033

H
Haojun Liao 已提交
1034
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1035

H
Haojun Liao 已提交
1036 1037
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1038

H
Haojun Liao 已提交
1039 1040 1041 1042 1043 1044 1045
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1046

1047
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1048
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1049

1050 1051 1052
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

H
Haojun Liao 已提交
1053
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
1054 1055 1056
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
  if (pBlockInfo != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
1057
    if (pScanInfo == NULL) {
1058
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
H
Haojun Liao 已提交
1059 1060 1061 1062
      return TSDB_CODE_INVALID_PARA;
    }

    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
H
Hongze Cheng 已提交
1063
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
1064
  }
1065 1066 1067 1068 1069 1070

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1071
}
H
Hongze Cheng 已提交
1072

1073
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1074
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1075

1076
  pBlockIter->numOfBlocks = numOfBlocks;
1077
  taosArrayClear(pBlockIter->blockList);
1078
  pBlockIter->pTableMap = pReader->status.pTableMap;
1079

1080 1081
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1082

1083
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
1084

1085
  SBlockOrderSupporter sup = {0};
1086
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
1087 1088 1089
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1090

1091 1092 1093 1094 1095 1096 1097
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1098

1099 1100 1101 1102
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1103

1104 1105
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1106

1107 1108 1109 1110 1111
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1112

1113
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
H
Hongze Cheng 已提交
1114
    SDataBlk block = {0};
1115 1116
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1117 1118

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
H
Hongze Cheng 已提交
1119
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
1120

1121
      wrapper.uid = pTableScanInfo->uid;
1122
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1123

1124 1125 1126 1127 1128 1129
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1130

1131
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1132

1133
  // since there is only one table qualified, blocks are not sorted
1134 1135
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1136 1137
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1138
    }
1139

1140
    int64_t et = taosGetTimestampUs();
1141
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1142
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1143

1144
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1145
    cleanupBlockOrderSupporter(&sup);
H
Haojun Liao 已提交
1146
    doSetCurrentBlock(pBlockIter, pReader->idStr);
1147
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1148
  }
H
Haojun Liao 已提交
1149

1150 1151
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1152

1153
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1154

1155 1156 1157 1158 1159
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1160
  }
H
Haojun Liao 已提交
1161

1162 1163 1164 1165
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1166

1167 1168
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1169

1170 1171 1172 1173
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1174

1175 1176
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1177
  }
H
Haojun Liao 已提交
1178

1179
  int64_t et = taosGetTimestampUs();
H
Hongze Cheng 已提交
1180 1181
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
            (et - st) / 1000.0, pReader->idStr);
1182 1183
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1184

1185
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1186
  doSetCurrentBlock(pBlockIter, pReader->idStr);
1187

1188
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1189
}
H
Hongze Cheng 已提交
1190

H
Haojun Liao 已提交
1191
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
1192 1193
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1194
  int32_t step = asc ? 1 : -1;
1195
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1196 1197 1198
    return false;
  }

1199
  pBlockIter->index += step;
H
Haojun Liao 已提交
1200
  doSetCurrentBlock(pBlockIter, idStr);
1201

1202 1203 1204
  return true;
}

1205 1206 1207
/**
 * This is an two rectangles overlap cases.
 */
H
Hongze Cheng 已提交
1208
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
1209 1210
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1211 1212
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1213
}
H
Hongze Cheng 已提交
1214

H
Haojun Liao 已提交
1215
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
H
Hongze Cheng 已提交
1216
                                             int32_t* nextIndex, int32_t order) {
1217
  bool asc = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1218
  if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
1219
    return NULL;
1220 1221
  }

H
Haojun Liao 已提交
1222
  if (!asc && pBlockInfo->tbBlockIdx == 0) {
1223 1224 1225
    return NULL;
  }

1226
  int32_t step = asc ? 1 : -1;
H
Haojun Liao 已提交
1227
  *nextIndex = pBlockInfo->tbBlockIdx + step;
1228

H
Hongze Cheng 已提交
1229 1230
  SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
  int32_t*  indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
1231

H
Hongze Cheng 已提交
1232
  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
1233
  return pBlock;
1234 1235 1236 1237 1238
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1239
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1240 1241
  int32_t index = pBlockIter->index;

1242
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1255
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1256
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1257 1258 1259 1260
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1261 1262 1263 1264 1265
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1266

1267 1268 1269
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1270

H
Haojun Liao 已提交
1271
  doSetCurrentBlock(pBlockIter, "");
1272 1273 1274
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1275
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
1276 1277 1278 1279 1280 1281
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1282
}
H
Hongze Cheng 已提交
1283

H
Hongze Cheng 已提交
1284
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
H
Haojun Liao 已提交
1285
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1286

1287
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1288
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1289
}
H
Hongze Cheng 已提交
1290

H
Hongze Cheng 已提交
1291
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1292 1293
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1294 1295
}

H
Hongze Cheng 已提交
1296
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
1297 1298 1299 1300 1301
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1302
      if (p->version >= pBlock->minVer) {
1303 1304 1305
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1306
      if (p->version >= pBlock->minVer) {
1307 1308
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
H
Hongze Cheng 已提交
1309 1310
          if (pnext->ts >= pBlock->minKey.ts) {
            return true;
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

H
Hongze Cheng 已提交
1324
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
1325 1326 1327 1328
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1329
  // ts is not overlap
1330
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1331
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1332 1333 1334 1335 1336
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1337 1338 1339 1340
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1341
    while (1) {
1342 1343 1344 1345 1346
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1347 1348 1349
      }
    }

1350 1351
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1352 1353
}

H
Haojun Liao 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
typedef struct {
  bool overlapWithNeighborBlock;
  bool hasDupTs;
  bool overlapWithDelInfo;
  bool overlapWithLastBlock;
  bool overlapWithKeyInBuf;
  bool partiallyRequired;
  bool moreThanCapcity;
} SDataBlockToLoadInfo;

static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                               STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
                               STsdbReader* pReader) {
H
Hongze Cheng 已提交
1367
  int32_t   neighborIndex = 0;
H
Haojun Liao 已提交
1368
  SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order);
1369

1370
  // overlap with neighbor
1371
  if (pNeighbor) {
H
Haojun Liao 已提交
1372
    pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1373
    taosMemoryFree(pNeighbor);
1374 1375
  }

1376
  // has duplicated ts of different version in this block
H
Haojun Liao 已提交
1377 1378
  pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1379

H
Haojun Liao 已提交
1380 1381 1382
  if (hasDataInLastBlock(pLastBlockReader)) {
    int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
    pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
1383 1384
  }

H
Haojun Liao 已提交
1385 1386 1387 1388
  pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
  pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
  pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
1389

H
Haojun Liao 已提交
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);

  bool loadDataBlock =
      (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
       info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
1404 1405 1406 1407

  // log the reason why load the datablock for profile
  if (loadDataBlock) {
    tsdbDebug("%p uid:%" PRIu64
1408
              " need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
1409
              "overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
H
Haojun Liao 已提交
1410 1411 1412
              pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
              info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
              pReader->idStr);
1413 1414 1415
  }

  return loadDataBlock;
H
Haojun Liao 已提交
1416 1417
}

H
Haojun Liao 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
                                 STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
  SDataBlockToLoadInfo info = {0};
  getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
  bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
                            info.overlapWithDelInfo || info.overlapWithLastBlock);
  return isCleanFileBlock;
}

1427
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1428
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1429 1430
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1431

1432 1433 1434
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1435
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1436

1437
  blockDataUpdateTsWindow(pBlock, 0);
1438
  pBlock->info.uid = pBlockScanInfo->uid;
1439

1440
  setComposedBlockFlag(pReader, true);
1441

1442
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1443
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1444 1445 1446
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1447 1448

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1449 1450 1451
  return code;
}

1452 1453
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1454 1455 1456 1457 1458
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1459
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1460 1461

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1462
    if (nextKey != key) {  // merge is not needed
1463
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1464 1465 1466 1467 1468 1469 1470 1471
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

1472 1473
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
                                  SVersionRange* pVerRange) {
1474 1475 1476 1477 1478 1479 1480 1481
  while (1) {
    bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
    if (!hasVal) {
      return false;
    }

    TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
    TSDBKEY k = TSDBROW_KEY(&row);
1482 1483
    if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
                        pVerRange)) {
1484 1485 1486 1487 1488 1489 1490
      return true;
    }
  }
}

static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
                                           STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
1491
  bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
  if (hasVal) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 != ts) {
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
      return true;
    }
  } else {
    doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
    return true;
  }

  return false;
}

H
Haojun Liao 已提交
1506 1507 1508
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
1509
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
H
Haojun Liao 已提交
1510 1511
  }

1512
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1513 1514 1515 1516 1517 1518
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1519 1520 1521 1522 1523 1524
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    } else {
      return pReader->pMemSchema;
    }
H
Haojun Liao 已提交
1525 1526 1527 1528 1529 1530
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

H
Haojun Liao 已提交
1531
  taosMemoryFreeClear(pReader->pMemSchema);
H
Haojun Liao 已提交
1532
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
H
Haojun Liao 已提交
1533
  if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
H
Haojun Liao 已提交
1534 1535
    terrno = code;
    return NULL;
H
Haojun Liao 已提交
1536 1537
  } else {
    return pReader->pMemSchema;
H
Haojun Liao 已提交
1538
  }
H
Haojun Liao 已提交
1539 1540
}

1541
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1542 1543 1544 1545 1546 1547
                                     SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1548
  int64_t tsLast = INT64_MIN;
1549
  if (hasDataInLastBlock(pLastBlockReader)) {
1550 1551
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1552

H
Hongze Cheng 已提交
1553 1554
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1555

1556 1557
  int64_t minKey = 0;
  if (pReader->order == TSDB_ORDER_ASC) {
H
Hongze Cheng 已提交
1558
    minKey = INT64_MAX;  // chosen the minimum value
1559
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1560 1561
      minKey = tsLast;
    }
1562

1563 1564 1565
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1566

1567
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1568 1569 1570 1571
      minKey = key;
    }
  } else {
    minKey = INT64_MIN;
1572
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1573 1574 1575 1576 1577 1578 1579
      minKey = tsLast;
    }

    if (minKey < k.ts) {
      minKey = k.ts;
    }

1580
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1581 1582
      minKey = key;
    }
1583 1584 1585 1586
  }

  bool init = false;

1587
  // ASC: file block ---> last block -----> imem -----> mem
H
Hongze Cheng 已提交
1588
  // DESC: mem -----> imem -----> last block -----> file block
1589 1590
  if (pReader->order == TSDB_ORDER_ASC) {
    if (minKey == key) {
1591
      init = true;
H
Haojun Liao 已提交
1592 1593 1594 1595
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1596
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1597 1598
    }

1599
    if (minKey == tsLast) {
1600
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1601 1602 1603
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1604
        init = true;
H
Haojun Liao 已提交
1605 1606 1607 1608
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1609
      }
1610
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1611
    }
1612

1613
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1614 1615 1616
      if (init) {
        tRowMerge(&merge, pRow);
      } else {
1617 1618
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1619 1620 1621 1622 1623 1624 1625 1626
        int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
1627 1628 1629 1630 1631
      }
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1632
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1633 1634 1635 1636 1637 1638
      int32_t   code = tRowMergerInit(&merge, pRow, pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1639
      if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
1640 1641
        return code;
      }
1642 1643
    }

1644
    if (minKey == tsLast) {
1645
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1646 1647 1648
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1649
        init = true;
H
Haojun Liao 已提交
1650 1651 1652 1653
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1654
      }
1655
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1656 1657 1658
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1659 1660 1661
      if (init) {
        tRowMerge(&merge, &fRow);
      } else {
1662
        init = true;
H
Haojun Liao 已提交
1663 1664 1665 1666
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1667 1668 1669
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }
1670 1671
  }

1672 1673 1674 1675 1676
  int32_t code = tRowMergerGetRow(&merge, &pTSRow);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1677 1678 1679 1680 1681 1682 1683
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
  return TSDB_CODE_SUCCESS;
}

1684 1685 1686
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
                                            STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                            bool mergeBlockData) {
1687
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1688
  int64_t             tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
1689 1690 1691

  STSRow*    pTSRow = NULL;
  SRowMerger merge = {0};
1692
  TSDBROW    fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Hongze Cheng 已提交
1693
  tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
1694

1695 1696 1697 1698 1699
  // only last block exists
  if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
    if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
      return TSDB_CODE_SUCCESS;
    } else {
H
Haojun Liao 已提交
1700 1701 1702 1703
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1704 1705 1706

      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
      tRowMerge(&merge, &fRow1);
1707
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
1708

H
Haojun Liao 已提交
1709
      code = tRowMergerGetRow(&merge, &pTSRow);
1710 1711 1712
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1713

1714 1715 1716 1717 1718 1719
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
    }
  } else {  // not merge block data
H
Haojun Liao 已提交
1720 1721 1722 1723 1724
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1725
    doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
H
Haojun Liao 已提交
1726
    ASSERT(mergeBlockData);
1727 1728

    // merge with block data if ts == key
H
Haojun Liao 已提交
1729
    if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
1730 1731 1732
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    }

H
Haojun Liao 已提交
1733
    code = tRowMergerGetRow(&merge, &pTSRow);
1734 1735 1736 1737 1738 1739 1740 1741 1742
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
  }
1743 1744 1745 1746

  return TSDB_CODE_SUCCESS;
}

1747 1748
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
                                          STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1749 1750
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1751
  if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
1752
    // no last block available, only data block exists
1753
    if (!hasDataInLastBlock(pLastBlockReader)) {
1754 1755 1756 1757 1758 1759 1760 1761 1762
      return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
    }

    // row in last file block
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    ASSERT(ts >= key);

    if (ASCENDING_TRAVERSE(pReader->order)) {
1763
      if (key < ts) {  // imem, mem are all empty, file blocks (data blocks and last block) exist
1764 1765 1766 1767
        return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
      } else if (key == ts) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};
1768

H
Haojun Liao 已提交
1769 1770 1771 1772 1773
        int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1774
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1775 1776 1777 1778

        TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
        tRowMerge(&merge, &fRow1);

1779
        doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
1780

H
Haojun Liao 已提交
1781
        code = tRowMergerGetRow(&merge, &pTSRow);
1782 1783 1784 1785
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

1786
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1787

1788 1789
        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
1790
        return code;
1791
      } else {
1792 1793
        ASSERT(0);
        return TSDB_CODE_SUCCESS;
1794
      }
1795
    } else {  // desc order
1796
      return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
1797
    }
1798
  } else {  // only last block exists
1799
    return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
H
Haojun Liao 已提交
1800
  }
1801 1802
}

1803 1804
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
                                     SLastBlockReader* pLastBlockReader) {
H
Haojun Liao 已提交
1805 1806 1807
  SRowMerger          merge = {0};
  STSRow*             pTSRow = NULL;
  int32_t             code = TSDB_CODE_SUCCESS;
1808 1809 1810
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SArray*             pDelList = pBlockScanInfo->delSkyline;

1811 1812
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
1813 1814
  ASSERT(pRow != NULL && piRow != NULL);

1815
  int64_t tsLast = INT64_MIN;
1816 1817 1818
  if (hasDataInLastBlock(pLastBlockReader)) {
    tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
  }
1819

H
Hongze Cheng 已提交
1820
  int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
1821 1822 1823 1824

  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

1825
  int64_t minKey = 0;
1826 1827 1828 1829 1830
  if (ASCENDING_TRAVERSE(pReader->order)) {
    minKey = INT64_MAX;  // let's find the minimum
    if (minKey > k.ts) {
      minKey = k.ts;
    }
1831

1832 1833 1834
    if (minKey > ik.ts) {
      minKey = ik.ts;
    }
1835

1836
    if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1837 1838
      minKey = key;
    }
1839

1840
    if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
1841 1842 1843
      minKey = tsLast;
    }
  } else {
H
Hongze Cheng 已提交
1844
    minKey = INT64_MIN;  // let find the maximum ts value
1845 1846 1847 1848 1849 1850 1851 1852
    if (minKey < k.ts) {
      minKey = k.ts;
    }

    if (minKey < ik.ts) {
      minKey = ik.ts;
    }

1853
    if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
1854 1855 1856
      minKey = key;
    }

1857
    if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
1858 1859
      minKey = tsLast;
    }
1860 1861 1862 1863
  }

  bool init = false;

1864 1865 1866 1867
  // ASC: file block -----> last block -----> imem -----> mem
  // DESC: mem -----> imem -----> last block -----> file block
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (minKey == key) {
1868
      init = true;
1869
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
H
Haojun Liao 已提交
1870 1871 1872 1873 1874
      int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

1875
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1876 1877
    }

1878
    if (minKey == tsLast) {
1879
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1880 1881 1882
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1883
        init = true;
H
Haojun Liao 已提交
1884 1885 1886 1887
        int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1888
      }
H
Haojun Liao 已提交
1889

1890
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1891 1892 1893
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1894 1895 1896
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1897 1898
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1899 1900 1901 1902
        if (pSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1903
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1904 1905 1906
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1907
      }
H
Haojun Liao 已提交
1908

1909 1910
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
H
Haojun Liao 已提交
1911 1912 1913
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1914 1915
    }

1916
    if (minKey == k.ts) {
H
Haojun Liao 已提交
1917
      if (init) {
1918 1919 1920 1921
        if (merge.pTSchema == NULL) {
          return code;
        }

H
Haojun Liao 已提交
1922 1923
        tRowMerge(&merge, pRow);
      } else {
1924
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1925
        code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1926 1927 1928
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1929
      }
H
Haojun Liao 已提交
1930 1931 1932 1933 1934
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1935 1936 1937 1938
    }
  } else {
    if (minKey == k.ts) {
      init = true;
1939
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1940
      code = tRowMergerInit(&merge, pRow, pSchema);
H
Haojun Liao 已提交
1941 1942 1943 1944
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
1945 1946 1947 1948 1949
      code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1950 1951 1952
    }

    if (minKey == ik.ts) {
H
Haojun Liao 已提交
1953 1954 1955
      if (init) {
        tRowMerge(&merge, piRow);
      } else {
1956 1957
        init = true;
        STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
H
Haojun Liao 已提交
1958
        code = tRowMergerInit(&merge, piRow, pSchema);
H
Haojun Liao 已提交
1959 1960 1961
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1962
      }
H
Haojun Liao 已提交
1963 1964 1965 1966 1967
      code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                              pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
1968 1969 1970
    }

    if (minKey == tsLast) {
1971
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
H
Haojun Liao 已提交
1972 1973 1974
      if (init) {
        tRowMerge(&merge, &fRow1);
      } else {
1975
        init = true;
1976
        code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
H
Haojun Liao 已提交
1977 1978 1979
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
1980
      }
1981
      doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
1982 1983 1984
    }

    if (minKey == key) {
H
Haojun Liao 已提交
1985
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1986
      if (!init) {
1987
        code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
H
Haojun Liao 已提交
1988 1989 1990
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
H
Haojun Liao 已提交
1991
      } else {
1992 1993 1994
        if (merge.pTSchema == NULL) {
          return code;
        }
H
Haojun Liao 已提交
1995
        tRowMerge(&merge, &fRow);
1996 1997
      }
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1998 1999 2000
    }
  }

2001
  if (merge.pTSchema == NULL) {
2002 2003 2004
    return code;
  }

H
Haojun Liao 已提交
2005
  code = tRowMergerGetRow(&merge, &pTSRow);
2006 2007 2008 2009
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2010 2011 2012 2013
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

  taosMemoryFree(pTSRow);
  tRowMergerClear(&merge);
2014
  return code;
2015 2016
}

2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));

  STbData* d = NULL;
  if (pReader->pReadSnap->pMem != NULL) {
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
    if (d != NULL) {
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2042
                  "-%" PRId64 " %s",
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  STbData* di = NULL;
  if (pReader->pReadSnap->pIMem != NULL) {
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
    if (di != NULL) {
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
      if (code == TSDB_CODE_SUCCESS) {
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);

        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
H
Hongze Cheng 已提交
2063
                  "-%" PRId64 " %s",
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
      } else {
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
        return code;
      }
    }
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
  }

  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

  pBlockScanInfo->iterInit = true;
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2081 2082
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2083 2084 2085 2086 2087 2088 2089 2090
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

2102
  TSDBKEY k = {.ts = ts, .version = ver};
2103 2104
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
                     &pReader->verRange)) {
2105 2106 2107
    return false;
  }

2108 2109 2110
  return true;
}

2111
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
2112
  // the last block reader has been initialized for this table.
2113
  if (pLBlockReader->uid == pScanInfo->uid) {
2114 2115 2116
    return true;
  }

2117 2118
  if (pLBlockReader->uid != 0) {
    tMergeTreeClose(&pLBlockReader->mergeTree);
2119 2120
  }

2121 2122
  initMemDataIterator(pScanInfo, pReader);
  pLBlockReader->uid = pScanInfo->uid;
2123

H
Hongze Cheng 已提交
2124
  int32_t     step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
2125 2126 2127
  STimeWindow w = pLBlockReader->window;
  if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
    w.skey = pScanInfo->lastKey + step;
2128
  } else {
2129
    w.ekey = pScanInfo->lastKey + step;
2130 2131
  }

2132 2133 2134
  int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
                                pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
                                pLBlockReader->pInfo, false, pReader->idStr);
2135 2136 2137 2138
  if (code != TSDB_CODE_SUCCESS) {
    return false;
  }

2139
  return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
2140 2141
}

2142
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
2143
  TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
2144
  return TSDBROW_TS(&row);
2145 2146
}

H
Hongze Cheng 已提交
2147
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
2148 2149 2150 2151

bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
  if (pBlockData->nRow > 0) {
    ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
2152 2153
  }

2154
  return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
2155
}
2156

2157 2158
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
                              STsdbReader* pReader) {
2159 2160 2161 2162
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
    return TSDB_CODE_SUCCESS;
  } else {
2163 2164
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);

2165 2166 2167
    STSRow*    pTSRow = NULL;
    SRowMerger merge = {0};

H
Haojun Liao 已提交
2168 2169 2170 2171 2172
    int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2173
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
2174
    code = tRowMergerGetRow(&merge, &pTSRow);
2175 2176 2177 2178
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2179 2180 2181 2182 2183 2184 2185 2186
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
2187 2188
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
                                          SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
2189 2190
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2191
  int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
2192
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
2193
    return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2194
  } else {
2195 2196 2197 2198 2199 2200 2201 2202 2203
    TSDBROW *pRow = NULL, *piRow = NULL;
    if (pBlockScanInfo->iter.hasVal) {
      pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
    }

    if (pBlockScanInfo->iiter.hasVal) {
      piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
    }

2204
    // imem + file + last block
2205
    if (pBlockScanInfo->iiter.hasVal) {
2206
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
2207 2208
    }

2209
    // mem + file + last block
2210
    if (pBlockScanInfo->iter.hasVal) {
2211
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
H
Haojun Liao 已提交
2212
    }
2213

2214 2215
    // files data blocks + last block
    return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
2216 2217 2218
  }
}

2219
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2220 2221
  int32_t code = TSDB_CODE_SUCCESS;

2222 2223
  SSDataBlock* pResBlock = pReader->pResBlock;

H
Hongze Cheng 已提交
2224
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Haojun Liao 已提交
2225 2226 2227
  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int64_t st = taosGetTimestampUs();
2228 2229 2230 2231

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
H
Haojun Liao 已提交
2232 2233
    if (pBlockScanInfo == NULL) {
      code = TSDB_CODE_INVALID_PARA;
2234 2235
      tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
                taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
H
Haojun Liao 已提交
2236 2237 2238
      goto _end;
    }

H
Haojun Liao 已提交
2239
    SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2240
    TSDBKEY   keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
H
Haojun Liao 已提交
2241 2242 2243

    // it is a clean block, load it directly
    if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
H
Hongze Cheng 已提交
2244 2245
      if (pReader->order == TSDB_ORDER_ASC ||
          (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
H
Haojun Liao 已提交
2246 2247 2248
        copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
        goto _end;
      }
H
Haojun Liao 已提交
2249 2250
    }
  } else {  // file blocks not exist
2251 2252 2253
    pBlockScanInfo = pReader->status.pTableIter;
  }

2254
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2255 2256
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2257

2258
  while (1) {
2259
    bool hasBlockData = false;
2260
    {
H
Haojun Liao 已提交
2261
      while (pBlockData->nRow > 0) {  // find the first qualified row in data block
2262 2263 2264 2265 2266
        if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
          hasBlockData = true;
          break;
        }

2267 2268
        pDumpInfo->rowIndex += step;

H
Hongze Cheng 已提交
2269
        SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2270
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
2271
          setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2272 2273 2274
          break;
        }
      }
2275
    }
2276

2277
    bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
2278

2279 2280 2281
    // no data in last block and block, no need to proceed.
    if ((hasBlockData == false) && (hasBlockLData == false)) {
      break;
2282 2283
    }

2284
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
2285

2286
    // currently loaded file data block is consumed
2287
    if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
H
Hongze Cheng 已提交
2288
      SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
2289
      setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
2290 2291 2292 2293 2294
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2295 2296 2297
    }
  }

H
Hongze Cheng 已提交
2298
_end:
2299
  pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
2300 2301
  blockDataUpdateTsWindow(pResBlock, 0);

2302
  setComposedBlockFlag(pReader, true);
H
Hongze Cheng 已提交
2303
  double el = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2304 2305 2306

  pReader->cost.composedBlocks += 1;
  pReader->cost.buildComposedBlockTime += el;
2307

2308 2309 2310
  if (pResBlock->info.rows > 0) {
    tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
              " rows:%d, elapsed time:%.2f ms %s",
H
Haojun Liao 已提交
2311
              pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
H
Haojun Liao 已提交
2312
              pResBlock->info.rows, el, pReader->idStr);
2313
  }
2314

H
Haojun Liao 已提交
2315
  return code;
2316 2317 2318 2319
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

dengyihao's avatar
dengyihao 已提交
2320 2321
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2322 2323 2324
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
2325

2326 2327 2328
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

2329 2330
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
2331
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
2332 2333
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
2334
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
2335
    if (code != TSDB_CODE_SUCCESS) {
2336 2337 2338 2339 2340
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
2341
      tsdbDelFReaderClose(&pDelFReader);
2342 2343 2344
      goto _err;
    }

H
Hongze Cheng 已提交
2345
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
2346 2347 2348
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
2349 2350
      goto _err;
    }
2351

2352 2353 2354
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
2355
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
2356
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
2357 2358 2359 2360 2361 2362 2363
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2364
    }
2365
  }
2366

2367 2368 2369 2370 2371 2372 2373
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2374 2375
  }

2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2390 2391
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2392 2393
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2394
  pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
2395 2396
  return code;

2397 2398 2399
_err:
  taosArrayDestroy(pDelData);
  return code;
2400 2401
}

H
Haojun Liao 已提交
2402
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
H
Hongze Cheng 已提交
2403
  TSDBKEY  key = {.ts = TSKEY_INITIAL_VAL};
2404
  TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2405
  if (pRow != NULL) {
2406 2407 2408
    key = TSDBROW_KEY(pRow);
  }

2409
  pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2410
  if (pRow != NULL) {
2411 2412 2413 2414 2415 2416 2417 2418 2419
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2420
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2421
  SReaderStatus* pStatus = &pReader->status;
2422
  pBlockNum->numOfBlocks = 0;
2423
  pBlockNum->numOfLastFiles = 0;
2424

2425
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2426
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2427 2428

  while (1) {
2429
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2430
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2431 2432 2433
      break;
    }

H
Haojun Liao 已提交
2434
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2435 2436
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2437
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2438 2439 2440
      return code;
    }

H
Hongze Cheng 已提交
2441
    if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
2442
      code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
H
Haojun Liao 已提交
2443
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2444
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2445 2446 2447
        return code;
      }

2448
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
H
Haojun Liao 已提交
2449 2450 2451
        break;
      }
    }
2452

H
Haojun Liao 已提交
2453 2454 2455
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2456
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2457 2458 2459
  return TSDB_CODE_SUCCESS;
}

2460
static int32_t uidComparFunc(const void* p1, const void* p2) {
2461 2462
  uint64_t pu1 = *(uint64_t*)p1;
  uint64_t pu2 = *(uint64_t*)p2;
2463 2464 2465
  if (pu1 == pu2) {
    return 0;
  } else {
2466
    return (pu1 < pu2) ? -1 : 1;
2467 2468
  }
}
2469

2470
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2471 2472 2473 2474
  int32_t index = 0;
  int32_t total = taosHashGetSize(pStatus->pTableMap);

  void* p = taosHashIterate(pStatus->pTableMap, NULL);
2475
  while (p != NULL) {
2476 2477 2478 2479 2480 2481 2482 2483
    STableBlockScanInfo* pScanInfo = p;
    pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
    p = taosHashIterate(pStatus->pTableMap, p);
  }

  taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
}

2484
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
2485 2486 2487 2488
  int32_t total = taosHashGetSize(pStatus->pTableMap);
  if (total == 0) {
    return TSDB_CODE_SUCCESS;
  }
2489

2490
  if (pOrderCheckInfo->tableUidList == NULL) {
2491 2492 2493 2494 2495 2496
    pOrderCheckInfo->currentIndex = 0;
    pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
    if (pOrderCheckInfo->tableUidList == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2497
    extractOrderedTableUidList(pOrderCheckInfo, pStatus);
2498 2499 2500
    uint64_t uid = pOrderCheckInfo->tableUidList[0];
    pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  } else {
2501 2502
    if (pStatus->pTableIter == NULL) {  // it is the last block of a new file
      pOrderCheckInfo->currentIndex = 0;
2503 2504
      uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
      pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2505 2506

      // the tableMap has already updated
2507
      if (pStatus->pTableIter == NULL) {
2508
        void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
2509 2510 2511 2512 2513 2514 2515 2516 2517
        if (p == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pOrderCheckInfo->tableUidList = p;
        extractOrderedTableUidList(pOrderCheckInfo, pStatus);

        uid = pOrderCheckInfo->tableUidList[0];
        pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
2518
      }
2519
    }
2520
  }
2521

2522 2523 2524
  return TSDB_CODE_SUCCESS;
}

2525
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537
  pOrderedCheckInfo->currentIndex += 1;
  if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
    pStatus->pTableIter = NULL;
    return false;
  }

  uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
  pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
  ASSERT(pStatus->pTableIter != NULL);
  return true;
}

2538
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
2539
  SReaderStatus*    pStatus = &pReader->status;
2540 2541
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

2542 2543
  SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
  int32_t             code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
2544
  if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
2545 2546
    return code;
  }
2547

2548
  while (1) {
2549 2550
    // load the last data block of current table
    STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
H
Hongze Cheng 已提交
2551
    bool                 hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
2552
    if (!hasVal) {
2553 2554
      bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
      if (!hasNexTable) {
2555 2556 2557
        return TSDB_CODE_SUCCESS;
      }
      continue;
2558 2559
    }

2560 2561 2562 2563 2564 2565 2566 2567
    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
2568

2569
    // current table is exhausted, let's try next table
2570 2571
    bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
    if (!hasNexTable) {
2572 2573
      return TSDB_CODE_SUCCESS;
    }
2574 2575 2576
  }
}

2577
static int32_t doBuildDataBlock(STsdbReader* pReader) {
H
Hongze Cheng 已提交
2578 2579
  int32_t   code = TSDB_CODE_SUCCESS;
  SDataBlk* pBlock = NULL;
2580 2581 2582

  SReaderStatus*       pStatus = &pReader->status;
  SDataBlockIter*      pBlockIter = &pStatus->blockIter;
2583 2584 2585
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2586

2587
  if (pBlockInfo != NULL) {
2588 2589 2590 2591 2592
    pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pScanInfo = pReader->status.pTableIter;
  }

H
Haojun Liao 已提交
2593
  if (pScanInfo == NULL) {
H
Haojun Liao 已提交
2594
    tsdbError("failed to get table scan-info, %s", pReader->idStr);
H
Haojun Liao 已提交
2595 2596 2597 2598
    code = TSDB_CODE_INVALID_PARA;
    return code;
  }

2599
  if (pBlockInfo != NULL) {
2600
    pBlock = getCurrentBlock(pBlockIter);
2601 2602
  }

2603
  initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
H
Haojun Liao 已提交
2604
  TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
2605

2606 2607 2608
  if (pBlockInfo == NULL) {  // build data block from last data file
    ASSERT(pBlockIter->numOfBlocks == 0);
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2609
  } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
2610
    code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
2611 2612
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2613 2614 2615
    }

    // build composed data block
2616
    code = buildComposedDataBlock(pReader);
H
Haojun Liao 已提交
2617
  } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
2618
    // data in memory that are earlier than current file block
H
Haojun Liao 已提交
2619
    // rows in buffer should be less than the file block in asc, greater than file block in desc
2620
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2621
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2622 2623 2624 2625
  } else {
    if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
      // only return the rows in last block
      int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
H
Hongze Cheng 已提交
2626
      ASSERT(tsLast >= pBlock->maxKey.ts);
2627 2628
      tBlockDataReset(&pReader->status.fileBlockData);

2629
      tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
2630
      code = buildComposedDataBlock(pReader);
H
Hongze Cheng 已提交
2631
    } else {  // whole block is required, return it directly
2632 2633 2634 2635 2636 2637 2638
      SDataBlockInfo* pInfo = &pReader->pResBlock->info;
      pInfo->rows = pBlock->nRow;
      pInfo->uid = pScanInfo->uid;
      pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
      setComposedBlockFlag(pReader, false);
      setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
    }
2639 2640 2641 2642 2643
  }

  return code;
}

H
Haojun Liao 已提交
2644
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2645 2646
  SReaderStatus* pStatus = &pReader->status;

2647
  while (1) {
2648 2649 2650
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2651
        return TSDB_CODE_SUCCESS;
2652 2653 2654 2655
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2656
    initMemDataIterator(pBlockScanInfo, pReader);
2657

2658
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2659
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2660 2661 2662 2663
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2664
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2665
      return TSDB_CODE_SUCCESS;
2666 2667 2668 2669 2670
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2671
      return TSDB_CODE_SUCCESS;
2672 2673 2674 2675
    }
  }
}

2676
// set the correct start position in case of the first/last file block, according to the query time window
2677
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
H
Hongze Cheng 已提交
2678
  SDataBlk* pBlock = getCurrentBlock(pBlockIter);
2679

2680 2681 2682
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2683 2684 2685

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2686
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2687 2688
}

2689
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2690 2691
  SBlockNumber num = {0};

2692
  int32_t code = moveToNextFile(pReader, &num);
2693 2694 2695 2696 2697
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2698
  if (num.numOfBlocks + num.numOfLastFiles == 0) {
2699 2700 2701 2702 2703
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2704 2705
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
H
Hongze Cheng 已提交
2706
  } else {  // no block data, only last block exists
2707
    tBlockDataReset(&pReader->status.fileBlockData);
2708
    resetDataBlockIterator(pBlockIter, pReader->order);
2709
  }
2710 2711

  // set the correct start position according to the query time window
2712
  initBlockDumpInfo(pReader, pBlockIter);
2713 2714 2715
  return code;
}

2716
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2717 2718
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2719 2720
}

2721
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2722
  int32_t code = TSDB_CODE_SUCCESS;
2723 2724
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2725 2726
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2727
  if (pBlockIter->numOfBlocks == 0) {
H
Hongze Cheng 已提交
2728
  _begin:
2729 2730 2731 2732 2733
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2734 2735 2736 2737
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }

2738
    // all data blocks are checked in this last block file, now let's try the next file
2739 2740 2741 2742 2743 2744 2745 2746
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

2747
      // this file does not have data files, let's start check the last block file if exists
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2763
  while (1) {
2764 2765
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2766
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2767
      code = buildComposedDataBlock(pReader);
2768 2769 2770 2771
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
H
Haojun Liao 已提交
2772
        bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr);
2773 2774
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2775
        } else {
H
Haojun Liao 已提交
2776
          if (pReader->status.pCurrentFileset->nSttF > 0) {
2777 2778 2779 2780 2781 2782
            // data blocks in current file are exhausted, let's try the next file now
            tBlockDataReset(&pReader->status.fileBlockData);
            resetDataBlockIterator(pBlockIter, pReader->order);
            goto _begin;
          } else {
            code = initForFirstBlockInFile(pReader, pBlockIter);
2783

2784 2785 2786 2787
            // error happens or all the data files are completely checked
            if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
              return code;
            }
2788

2789 2790 2791 2792
            // this file does not have blocks, let's start check the last block file
            if (pBlockIter->numOfBlocks == 0) {
              goto _begin;
            }
2793
          }
2794
        }
H
Haojun Liao 已提交
2795
      }
2796 2797

      code = doBuildDataBlock(pReader);
2798 2799
    }

2800 2801 2802 2803 2804 2805 2806 2807
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2808
}
H
refact  
Hongze Cheng 已提交
2809

2810 2811
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2812
  if (VND_IS_RSMA(pVnode)) {
2813
    int8_t  level = 0;
2814 2815
    int8_t  precision = pVnode->config.tsdbCfg.precision;
    int64_t now = taosGetTimestamp(precision);
H
Haojun Liao 已提交
2816 2817 2818
    int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI)   ? 1L
                                             : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
                                                                                        : 1000000L);
2819

2820
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2821 2822 2823 2824 2825 2826 2827
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
2828
      if ((now - pRetention->keep) <= (winSKey + offset)) {
2829 2830 2831 2832 2833
        break;
      }
      ++level;
    }

2834
    const char* str = (idStr != NULL) ? idStr : "";
2835 2836

    if (level == TSDB_RETENTION_L0) {
2837
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2838
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2839 2840
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2841
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2842
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2843 2844
      return VND_RSMA1(pVnode);
    } else {
2845
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2846
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2847 2848 2849 2850 2851 2852 2853
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2854
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2855
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2856 2857

  int64_t endVer = 0;
L
Liu Jicong 已提交
2858 2859
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2860 2861
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2862
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2863 2864
  }

H
Haojun Liao 已提交
2865
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2866 2867
}

2868
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
2869 2870 2871 2872
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2873 2874 2875
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2876

2877 2878 2879 2880 2881 2882
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2883
        return false;
2884 2885 2886
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2887 2888
      }
    } else {
2889 2890 2891 2892 2893 2894 2895
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

2896 2897
      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
          pVerRange->maxVer >= pCurrent->version) {
2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

2913 2914
          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
              pVerRange->maxVer >= pCurrent->version) {
2915 2916 2917 2918 2919 2920
            return true;
          }
        }
      }

      return false;
2921 2922
    }
  } else {
2923 2924
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2925

2926 2927 2928 2929 2930 2931 2932
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2933
    } else {
2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2961 2962 2963 2964 2965
      }

      return false;
    }
  }
2966 2967

  return false;
2968 2969
}

2970
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
2971
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2972 2973
    return NULL;
  }
H
Hongze Cheng 已提交
2974

2975
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2976
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2977
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2978
    pIter->hasVal = false;
H
Haojun Liao 已提交
2979 2980
    return NULL;
  }
H
Hongze Cheng 已提交
2981

2982
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2983
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2984
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
2985 2986
    return pRow;
  }
H
Hongze Cheng 已提交
2987

2988
  while (1) {
2989 2990
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2991 2992
      return NULL;
    }
H
Hongze Cheng 已提交
2993

2994
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2995

H
Haojun Liao 已提交
2996
    key = TSDBROW_KEY(pRow);
2997
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2998
      pIter->hasVal = false;
H
Haojun Liao 已提交
2999 3000
      return NULL;
    }
H
Hongze Cheng 已提交
3001

dengyihao's avatar
dengyihao 已提交
3002
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
3003
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
H
Haojun Liao 已提交
3004 3005 3006 3007
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
3008

3009 3010
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
3011
  while (1) {
3012 3013
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
3014 3015
      break;
    }
H
Hongze Cheng 已提交
3016

3017
    // data exists but not valid
3018
    TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
3019 3020 3021 3022 3023
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
3024
    TSDBKEY k = TSDBROW_KEY(pRow);
3025
    if (k.ts != ts) {
H
Haojun Liao 已提交
3026 3027 3028
      break;
    }

H
Haojun Liao 已提交
3029
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
H
Haojun Liao 已提交
3030 3031 3032 3033
    if (pTSchema == NULL) {
      return terrno;
    }

3034
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
3035 3036 3037 3038 3039
  }

  return TSDB_CODE_SUCCESS;
}

3040
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
3041
                                          SVersionRange* pVerRange, int32_t step) {
3042 3043
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
3044
      rowIndex += step;
3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

H
Hongze Cheng 已提交
3061
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
3062 3063
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
3064
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
3065
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
3066

3067
  *state = CHECK_FILEBLOCK_QUIT;
3068
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
3069

H
Hongze Cheng 已提交
3070 3071
  int32_t   nextIndex = -1;
  SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
3072
  if (pNeighborBlock == NULL) {  // do nothing
3073 3074 3075 3076
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
3077 3078
  taosMemoryFree(pNeighborBlock);

3079
  if (overlap) {  // load next block
3080
    SReaderStatus*  pStatus = &pReader->status;
3081 3082
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

3083
    // 1. find the next neighbor block in the scan block list
3084
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
3085
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
3086

3087
    // 2. remove it from the scan block list
3088
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
3089

3090
    // 3. load the neighbor block, and set it to be the currently accessed file data block
3091
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
3092 3093 3094 3095
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3096
    // 4. check the data values
3097 3098 3099 3100
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
3101
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
3102 3103 3104 3105 3106 3107 3108
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3109 3110
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
3111 3112
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

3113
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
3114
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
3115
  int32_t step = asc ? 1 : -1;
3116

3117
  pDumpInfo->rowIndex += step;
3118
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
3119 3120 3121
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
3122

3123 3124 3125 3126
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
3127

3128
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3129
      SDataBlk*           pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
3130 3131 3132
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
3133
      }
3134
    }
H
Haojun Liao 已提交
3135
  }
3136

H
Haojun Liao 已提交
3137 3138 3139
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3140
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
3141
                               SRowMerger* pMerger, SVersionRange* pVerRange) {
H
Haojun Liao 已提交
3142
  pScanInfo->lastKey = ts;
3143
  while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
3144 3145
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
3146
      TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
3147 3148 3149 3150 3151 3152 3153 3154 3155
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

3156 3157
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                                 STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
3158
  TSDBROW* pNextRow = NULL;
3159
  TSDBROW  current = *pRow;
3160

3161 3162
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
3163

3164 3165 3166
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
3167
      return TSDB_CODE_SUCCESS;
3168
    } else {  // has next point in mem/imem
3169
      pNextRow = getValidMemRow(pIter, pDelList, pReader);
3170 3171 3172
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3173
        return TSDB_CODE_SUCCESS;
3174 3175
      }

H
Haojun Liao 已提交
3176
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
3177 3178
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
3179
        return TSDB_CODE_SUCCESS;
3180
      }
3181
    }
3182 3183
  }

3184 3185
  SRowMerger merge = {0};

3186
  // get the correct schema for data in memory
H
Haojun Liao 已提交
3187
  terrno = 0;
H
Haojun Liao 已提交
3188
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
3189 3190 3191
  if (pTSchema == NULL) {
    return terrno;
  }
H
Haojun Liao 已提交
3192

3193 3194
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
3195
  }
H
Haojun Liao 已提交
3196

H
Haojun Liao 已提交
3197 3198 3199 3200
  int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
3201 3202

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
H
Haojun Liao 已提交
3203
  if (pTSchema1 == NULL) {
H
Haojun Liao 已提交
3204 3205 3206
    return terrno;
  }

H
Haojun Liao 已提交
3207 3208
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

H
Haojun Liao 已提交
3209 3210 3211 3212 3213
  code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

H
Haojun Liao 已提交
3214
  code = tRowMergerGetRow(&merge, pTSRow);
3215 3216 3217
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
M
Minglei Jin 已提交
3218

3219
  tRowMergerClear(&merge);
3220
  *freeTSRow = true;
3221
  return TSDB_CODE_SUCCESS;
3222 3223
}

3224
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
H
Hongze Cheng 已提交
3225
                           STSRow** pTSRow) {
H
Haojun Liao 已提交
3226 3227
  SRowMerger merge = {0};

3228 3229 3230
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

3231
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
3232
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3233

H
Haojun Liao 已提交
3234 3235 3236 3237 3238 3239 3240 3241 3242 3243
    int32_t code = tRowMergerInit(&merge, piRow, pSchema);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3244

3245
    tRowMerge(&merge, pRow);
H
Haojun Liao 已提交
3246 3247 3248 3249 3250 3251
    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

3252
  } else {
H
Haojun Liao 已提交
3253
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
3254

H
Haojun Liao 已提交
3255
    int32_t code = tRowMergerInit(&merge, pRow, pSchema);
3256
    if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
H
Haojun Liao 已提交
3257 3258 3259 3260 3261 3262 3263 3264
      return code;
    }

    code =
        doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3265 3266

    tRowMerge(&merge, piRow);
H
Haojun Liao 已提交
3267 3268 3269 3270 3271
    code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
                            pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
3272
  }
3273

3274 3275
  int32_t code = tRowMergerGetRow(&merge, pTSRow);
  return code;
3276 3277
}

3278 3279
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
3280 3281
  TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
3282
  SArray*  pDelList = pBlockScanInfo->delSkyline;
3283
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
3284

3285 3286
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
3287
  if (pBlockScanInfo->iter.hasVal) {
3288 3289 3290 3291 3292 3293
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

3294
  if (pBlockScanInfo->iiter.hasVal) {
3295 3296 3297 3298 3299 3300
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

3301
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
3302
    TSDBKEY k = TSDBROW_KEY(pRow);
3303
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
3304

3305
    int32_t code = TSDB_CODE_SUCCESS;
3306 3307
    if (ik.ts != k.ts) {
      if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) {  // ik.ts < k.ts
3308
        code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
3309
      } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
3310
        code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
3311
      }
3312
    } else {  // ik.ts == k.ts
3313
      *freeTSRow = true;
3314 3315 3316 3317
      code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
3318
    }
3319

3320
    return code;
H
Haojun Liao 已提交
3321 3322
  }

3323
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
H
Hongze Cheng 已提交
3324 3325
    return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
                                    freeTSRow);
H
Haojun Liao 已提交
3326 3327
  }

3328
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
3329
    return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
3330 3331 3332 3333 3334
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3335
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
3336 3337 3338
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

3339
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
3340
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
3341

3342
  SColVal colVal = {0};
3343
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
3344

3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
3356
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
3357 3358 3359 3360 3361 3362 3363 3364
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
3365
    }
3366 3367
  }

3368
  // set null value since current column does not exist in the "pSchema"
3369
  while (i < numOfCols) {
3370 3371 3372 3373 3374
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

3375 3376 3377 3378
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
3379 3380
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                 int32_t rowIndex) {
3381 3382 3383 3384 3385 3386 3387 3388
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
3389
    i += 1;
3390 3391 3392
  }

  SColVal cv = {0};
3393 3394
  int32_t numOfInputCols = pBlockData->aIdx->size;
  int32_t numOfOutputCols = pResBlock->pDataBlock->size;
3395

3396
  while (i < numOfOutputCols && j < numOfInputCols) {
3397
    SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
3398
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
3399

3400 3401 3402 3403 3404
    if (pData->cid < pCol->info.colId) {
      j += 1;
      continue;
    }

3405
    if (pData->cid == pCol->info.colId) {
3406 3407
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
3408
      j += 1;
H
Haojun Liao 已提交
3409 3410
    } else if (pData->cid > pCol->info.colId) {
      // the specified column does not exist in file block, fill with null data
3411 3412 3413 3414 3415 3416 3417 3418
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
3419
    colDataAppendNULL(pCol, outputRowIndex);
3420 3421 3422 3423 3424 3425 3426
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

3427 3428
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
3429 3430 3431 3432
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
3433
    bool    freeTSRow = false;
3434
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
3435 3436
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
3437 3438
    }

H
Haojun Liao 已提交
3439
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
3440 3441 3442
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
3443 3444

    // no data in buffer, return immediately
3445
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
3446 3447 3448
      break;
    }

3449
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
3450 3451 3452 3453
      break;
    }
  } while (1);

3454
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
3455 3456
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3457

3458 3459
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
3460
  ASSERT(pReader != NULL);
3461 3462 3463 3464 3465

  STableBlockScanInfo* p = NULL;
  while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
    clearBlockScanInfo(p);
  }
3466 3467
  taosHashClear(pReader->status.pTableMap);

3468 3469 3470 3471 3472 3473
  STableKeyInfo* pList = (STableKeyInfo*) pTableList;
  for(int32_t i = 0; i < num; ++i) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid};
    taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
  }

H
Hongze Cheng 已提交
3474 3475 3476
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
3477 3478 3479 3480 3481 3482
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
3483

dengyihao's avatar
dengyihao 已提交
3484 3485 3486 3487 3488 3489
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
3490

H
Hongze Cheng 已提交
3491
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
3492

3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
    return TSDB_CODE_SUCCESS;
  } else {
    return initForFirstBlockInFile(pReader, pBlockIter);
  }
}

H
refact  
Hongze Cheng 已提交
3508
// ====================================== EXPOSED APIs ======================================
3509 3510
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
                       STsdbReader** ppReader, const char* idstr) {
3511 3512 3513 3514 3515 3516
  STimeWindow window = pCond->twindows;
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    pCond->twindows.skey += 1;
    pCond->twindows.ekey -= 1;
  }

3517 3518
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3519 3520
    goto _err;
  }
H
Hongze Cheng 已提交
3521

3522
  // check for query time window
H
Haojun Liao 已提交
3523
  STsdbReader* pReader = *ppReader;
3524
  if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3525 3526 3527
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3528

3529 3530
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
3531
    int32_t order = pCond->order;
3532
    if (order == TSDB_ORDER_ASC) {
3533
      pCond->twindows.ekey = window.skey;
3534 3535 3536
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
3537
      pCond->twindows.skey = window.ekey;
3538 3539 3540 3541
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3542
    // here we only need one more row, so the capacity is set to be ONE.
3543 3544 3545 3546 3547 3548
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
3549
      pCond->twindows.skey = window.ekey;
3550
      pCond->twindows.ekey = INT64_MAX;
3551
    } else {
3552
      pCond->twindows.skey = INT64_MIN;
3553
      pCond->twindows.ekey = window.ekey;
3554
    }
3555 3556
    pCond->order = order;

3557 3558 3559 3560 3561 3562
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

H
Haojun Liao 已提交
3563
  // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
3564
  if (pCond->suid != 0) {
3565
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
H
Haojun Liao 已提交
3566
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3567
      tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
H
Haojun Liao 已提交
3568
    }
3569 3570
  } else if (numOfTables > 0) {
    STableKeyInfo* pKey = pTableList;
3571
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
H
Haojun Liao 已提交
3572
    if (pReader->pSchema == NULL) {
H
Haojun Liao 已提交
3573
      tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
H
Haojun Liao 已提交
3574
    }
3575 3576
  }

H
Hongze Cheng 已提交
3577
  STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader;
3578

3579
  pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
H
Haojun Liao 已提交
3580 3581 3582
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3583

H
Haojun Liao 已提交
3584 3585 3586
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3587

3588
  if (numOfTables > 0) {
H
Haojun Liao 已提交
3589
    code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
3590 3591 3592
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Hongze Cheng 已提交
3593

3594
    if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
H
Haojun Liao 已提交
3595 3596 3597
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3598
      }
3599
    } else {
H
Haojun Liao 已提交
3600 3601
      STsdbReader* pPrevReader = pReader->innerReader[0];
      STsdbReader* pNextReader = pReader->innerReader[1];
3602

H
Haojun Liao 已提交
3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614
      // we need only one row
      pPrevReader->capacity = 1;
      pPrevReader->status.pTableMap = pReader->status.pTableMap;
      pPrevReader->pSchema = pReader->pSchema;
      pPrevReader->pMemSchema = pReader->pMemSchema;
      pPrevReader->pReadSnap = pReader->pReadSnap;

      pNextReader->capacity = 1;
      pNextReader->status.pTableMap = pReader->status.pTableMap;
      pNextReader->pSchema = pReader->pSchema;
      pNextReader->pMemSchema = pReader->pMemSchema;
      pNextReader->pReadSnap = pReader->pReadSnap;
3615

H
Haojun Liao 已提交
3616
      code = doOpenReaderImpl(pPrevReader);
3617
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3618
        return code;
3619 3620
      }

H
Haojun Liao 已提交
3621 3622 3623 3624
      code = doOpenReaderImpl(pNextReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3625

H
Haojun Liao 已提交
3626 3627 3628
      code = doOpenReaderImpl(pReader);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
3629
      }
3630 3631 3632
    }
  }

3633
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3634
  return code;
H
Hongze Cheng 已提交
3635 3636

_err:
H
Haojun Liao 已提交
3637
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
H
Hongze Cheng 已提交
3638
  return code;
H
refact  
Hongze Cheng 已提交
3639 3640 3641
}

void tsdbReaderClose(STsdbReader* pReader) {
3642 3643
  if (pReader == NULL) {
    return;
3644
  }
H
refact  
Hongze Cheng 已提交
3645

3646 3647
  {
    if (pReader->innerReader[0] != NULL) {
3648
      STsdbReader* p = pReader->innerReader[0];
3649

3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660
      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;

      p = pReader->innerReader[1];

      p->status.pTableMap = NULL;
      p->pReadSnap = NULL;
      p->pSchema = NULL;
      p->pMemSchema = NULL;
3661 3662 3663 3664 3665 3666

      tsdbReaderClose(pReader->innerReader[0]);
      tsdbReaderClose(pReader->innerReader[1]);
    }
  }

3667
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3668

3669 3670 3671 3672
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3673
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3674 3675 3676 3677
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
3678

3679
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3680
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3681 3682

  cleanupDataBlockIterator(&pReader->status.blockIter);
3683 3684

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3685
  destroyBlockScanInfo(pReader->status.pTableMap);
3686
  blockDataDestroy(pReader->pResBlock);
3687

H
Haojun Liao 已提交
3688 3689 3690
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3691

H
Haojun Liao 已提交
3692
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
3693

3694
  taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
H
Haojun Liao 已提交
3695
  SIOCostSummary* pCost = &pReader->cost;
3696

H
Haojun Liao 已提交
3697 3698
  SFilesetIter* pFilesetIter = &pReader->status.fileIter;
  if (pFilesetIter->pLastBlockReader != NULL) {
H
Haojun Liao 已提交
3699 3700
    SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
    tMergeTreeClose(&pLReader->mergeTree);
H
Haojun Liao 已提交
3701

H
Haojun Liao 已提交
3702 3703 3704 3705 3706
    getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);

    pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
    taosMemoryFree(pLReader);
  }
H
refact  
Hongze Cheng 已提交
3707

H
Haojun Liao 已提交
3708 3709
  tsdbDebug(
      "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
H
Hongze Cheng 已提交
3710 3711
      " SMA-time:%.2f ms, fileBlocks:%" PRId64
      ", fileBlocks-load-time:%.2f ms, "
H
Haojun Liao 已提交
3712 3713 3714 3715 3716
      "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
      ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
      pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
      pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
      pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3717

3718 3719
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3720 3721 3722
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3723
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3724 3725
}

3726
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3727
  // cleanup the data that belongs to the previous data block
3728 3729
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3730

3731
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3732

3733 3734 3735 3736 3737
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3738

3739 3740 3741
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3742
      buildBlockFromBufferSequentially(pReader);
3743
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3744
    }
3745 3746 3747
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3748
  }
3749

3750
  return false;
H
refact  
Hongze Cheng 已提交
3751 3752
}

3753 3754 3755 3756 3757
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

3758
  if (pReader->innerReader[0] != NULL && pReader->step == 0) {
3759
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
3760
    resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
3761 3762
    pReader->step = EXTERNAL_ROWS_PREV;

3763 3764 3765
    if (ret) {
      return ret;
    }
3766
  }
3767

3768 3769
  if (pReader->step == EXTERNAL_ROWS_PREV) {
    pReader->step = EXTERNAL_ROWS_MAIN;
3770 3771 3772 3773 3774 3775 3776
  }

  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

3777 3778
  if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
    resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
3779
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
3780
    pReader->step = EXTERNAL_ROWS_NEXT;
3781 3782 3783 3784 3785 3786 3787 3788
    if (ret1) {
      return ret1;
    }
  }

  return false;
}

3789
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
H
Haojun Liao 已提交
3790
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
3791
  if (pBlockScanInfo == NULL) {  // no data block for the table of given uid
3792 3793 3794
    return false;
  }

H
Haojun Liao 已提交
3795
  return true;
3796 3797
}

3798
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3799 3800 3801 3802
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3803 3804
}

3805 3806
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3807
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3808
      setBlockInfo(pReader, pDataBlockInfo);
3809
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3810 3811 3812 3813 3814 3815 3816 3817 3818
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3819
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3820
  int32_t code = 0;
3821
  *allHave = false;
H
Hongze Cheng 已提交
3822

3823
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3824 3825 3826 3827
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3828
  // there is no statistics data for composed block
3829 3830 3831 3832
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3833

3834
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3835

H
Hongze Cheng 已提交
3836 3837
  SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
  int64_t   stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3838

3839 3840
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

H
Hongze Cheng 已提交
3841
  if (tDataBlkHasSma(pBlock)) {
H
Hongze Cheng 已提交
3842
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3843
    if (code != TSDB_CODE_SUCCESS) {
3844 3845
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3846 3847
      return code;
    }
3848 3849 3850
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3851
  }
H
Hongze Cheng 已提交
3852

3853
  *allHave = true;
H
Hongze Cheng 已提交
3854

3855 3856
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3857

3858 3859
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3876 3877
      i += 1;
      j += 1;
3878 3879 3880 3881 3882 3883 3884
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3885
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3886
  pReader->cost.smaLoadTime += elapsed;
3887
  pReader->cost.smaDataLoad += 1;
3888 3889 3890

  *pBlockStatis = pSup->plist;

3891
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3892 3893
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3894
  return code;
H
Hongze Cheng 已提交
3895 3896
}

3897
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3898 3899 3900
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3901
    return pReader->pResBlock->pDataBlock;
3902
  }
3903

H
Haojun Liao 已提交
3904 3905 3906 3907 3908 3909 3910 3911
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  if (pBlockScanInfo == NULL) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
              taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
    return NULL;
  }
3912

3913
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
3914
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3915
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
3916 3917
    terrno = code;
    return NULL;
3918
  }
3919 3920 3921

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3922 3923
}

3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3936
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
H
Haojun Liao 已提交
3937
  if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
3938 3939
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3940

L
Liu Jicong 已提交
3941
  pReader->order = pCond->order;
3942
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3943
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3944
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3945
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3946

3947
  // allocate buffer in order to load data blocks from file
3948
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3949 3950
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3951
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3952
  tsdbDataFReaderClose(&pReader->pFileReader);
3953

3954
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3955

3956
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
3957
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
H
Haojun Liao 已提交
3958

H
Hongze Cheng 已提交
3959
  int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
H
Haojun Liao 已提交
3960
  resetDataBlockScanInfo(pReader->status.pTableMap, ts);
3961

3962
  int32_t         code = 0;
3963 3964
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3965 3966 3967 3968 3969 3970
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3971 3972
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3973 3974 3975
      return code;
    }
  }
H
Hongze Cheng 已提交
3976

H
Hongze Cheng 已提交
3977 3978 3979 3980
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
            " in query %s",
            pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
            pReader->idStr);
3981

3982
  return code;
H
Hongze Cheng 已提交
3983
}
H
Hongze Cheng 已提交
3984

3985 3986 3987
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3988

3989 3990 3991 3992
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3993

3994 3995
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3996

3997 3998 3999
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
4000

4001
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
4002

4003
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
4004

4005 4006
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
4007

4008 4009
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
4010

4011 4012
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
4013
  }
H
Hongze Cheng 已提交
4014

4015
  pTableBlockInfo->numOfTables = numOfTables;
4016
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
4017

4018 4019
  while (true) {
    if (hasNext) {
H
Hongze Cheng 已提交
4020
      SDataBlk* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
4021

4022 4023
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
4024

4025 4026 4027
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4028

4029 4030 4031
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
4032

4033 4034 4035
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
4036

4037 4038
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
4039

H
Haojun Liao 已提交
4040
      hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
4041 4042 4043 4044 4045
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
4046

4047 4048
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
4049
    }
H
refact  
Hongze Cheng 已提交
4050

H
Hongze Cheng 已提交
4051 4052
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
4053
  }
H
Hongze Cheng 已提交
4054

H
refact  
Hongze Cheng 已提交
4055 4056
  return code;
}
H
Hongze Cheng 已提交
4057

H
refact  
Hongze Cheng 已提交
4058
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
4059
  int64_t rows = 0;
H
Hongze Cheng 已提交
4060

4061 4062
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
4063

4064 4065 4066 4067 4068
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
4069
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
4070 4071 4072 4073 4074 4075 4076
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
4077
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
4078 4079 4080 4081 4082 4083 4084 4085
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
4086

H
refact  
Hongze Cheng 已提交
4087
  return rows;
H
Hongze Cheng 已提交
4088
}
D
dapan1121 已提交
4089

L
Liu Jicong 已提交
4090
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
4103

D
dapan1121 已提交
4104
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
4105
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
4120
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
L
Liu Jicong 已提交
4121

D
dapan1121 已提交
4122 4123
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
4124

H
Haojun Liao 已提交
4125
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
H
Hongze Cheng 已提交
4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
4154
  // fs
H
Hongze Cheng 已提交
4155 4156 4157 4158 4159
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
4160 4161 4162 4163 4164 4165 4166 4167

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Haojun Liao 已提交
4168
  tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
H
Hongze Cheng 已提交
4169 4170 4171 4172
_exit:
  return code;
}

H
Haojun Liao 已提交
4173
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
H
Hongze Cheng 已提交
4174 4175 4176 4177 4178 4179 4180 4181 4182
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
4183
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
4184
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
4185
  }
H
Haojun Liao 已提交
4186 4187
  tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
}