tsdbRead.c 104.2 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
18

19 20 21 22 23 24
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

25
typedef struct {
dengyihao's avatar
dengyihao 已提交
26
  STbDataIter* iter;
27 28 29 30
  int32_t      index;
  bool         hasVal;
} SIterInfo;

H
Haojun Liao 已提交
31
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
32 33
  uint64_t  uid;
  TSKEY     lastKey;
34
  SMapData  mapData;     // block info (compressed)
dengyihao's avatar
dengyihao 已提交
35 36 37 38 39 40
  SArray*   pBlockList;  // block data index list
  SIterInfo iter;        // mem buffer skip list iterator
  SIterInfo iiter;       // imem buffer skip list iterator
  SArray*   delSkyline;  // delete info for this table
  int32_t   fileDelIndex;
  bool      iterInit;  // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
41 42 43
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
44
  int64_t uid;
45
  int64_t offset;
H
Haojun Liao 已提交
46
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
47 48

typedef struct SBlockOrderSupporter {
49 50 51 52
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
53 54 55
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
56 57 58
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
59
  int64_t headFileLoad;
60 61 62
  double  headFileLoadTime;
  int64_t smaData;
  double  smaLoadTime;
H
Hongze Cheng 已提交
63 64 65
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
66
  SArray*          pColAgg;
67
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
68
  SColumnDataAgg** plist;
69 70
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
71 72
} SBlockLoadSuppInfo;

73
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
74 75 76 77
  int32_t numOfFiles;  // number of total files
  int32_t index;       // current accessed index in the list
  SArray* pFileList;   // data file list
  int32_t order;
78
} SFilesetIter;
H
Haojun Liao 已提交
79 80

typedef struct SFileDataBlockInfo {
81
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
82
  uint64_t uid;
83
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
84 85 86
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
87 88 89 90 91
  int32_t   numOfBlocks;
  int32_t   index;
  SArray*   blockList;  // SArray<SFileDataBlockInfo>
  int32_t   order;
  SBlock    block;  // current SBlock data
92
  SHashObj* pTableMap;
H
Haojun Liao 已提交
93 94 95
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
96 97 98 99
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
100 101
} SFileBlockDumpInfo;

H
Haojun Liao 已提交
102
typedef struct SVersionRange {
dengyihao's avatar
dengyihao 已提交
103 104
  uint64_t minVer;
  uint64_t maxVer;
H
Haojun Liao 已提交
105 106
} SVersionRange;

H
Haojun Liao 已提交
107
typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
108 109
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
110
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
111
  SFileBlockDumpInfo   fBlockDumpInfo;
112 113 114 115 116
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
117 118
} SReaderStatus;

H
Hongze Cheng 已提交
119
struct STsdbReader {
H
Haojun Liao 已提交
120 121 122 123 124 125 126
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
127 128
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
129
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
130
  STsdbReadSnap*     pReadSnap;
131 132 133 134
  SIOCostSummary     cost;
  STSchema*          pSchema;
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
135

136 137
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
138
};
H
Hongze Cheng 已提交
139

H
Haojun Liao 已提交
140
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
141 142
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
143
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
144 145
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
146
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
147
                                 STsdbReader* pReader);
148
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
149 150
static int32_t  doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
                                     int32_t rowIndex);
151 152
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void     updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
153
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
154

dengyihao's avatar
dengyihao 已提交
155 156
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                             STsdbReader* pReader);
157 158
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
159 160 161 162
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
163
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Haojun Liao 已提交
164

165 166 167
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

168
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
169

170
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
171
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
172 173 174
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
175 176
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
177

H
Haojun Liao 已提交
178 179
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
180
    pSupInfo->colIds[i] = pCol->info.colId;
181 182 183 184

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
185
  }
H
Hongze Cheng 已提交
186

H
Haojun Liao 已提交
187 188
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
189

190
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
191
  // allocate buffer in order to load data blocks from file
192
  // todo use simple hash instead, optimize the memory consumption
193 194 195
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
196 197 198
    return NULL;
  }

199 200 201 202 203
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
204 205
      }

206
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
207
    } else {
208
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
209
    }
wmmhello's avatar
wmmhello 已提交
210

211 212 213
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
214 215
  }

216 217
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
218

219
  return pTableMap;
H
Hongze Cheng 已提交
220
}
H
Hongze Cheng 已提交
221

222 223 224
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
225
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
226 227
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
228
    if (p->iter.iter != NULL) {
229
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
230 231
    }

232
    p->delSkyline = taosArrayDestroy(p->delSkyline);
233 234 235
  }
}

236 237 238 239 240 241 242 243
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
244
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
245 246 247
    }

    if (p->iiter.iter != NULL) {
248
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
249 250
    }

251 252
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
253
    tMapDataClear(&p->mapData);
254 255 256 257 258
  }

  taosHashCleanup(pTableMap);
}

259
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
260 261
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
262
}
H
Hongze Cheng 已提交
263

264 265 266
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
267
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
268

269
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
270
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
271

dengyihao's avatar
dengyihao 已提交
272
  STimeWindow win = *pWindow;
273 274 275 276 277 278
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
279

H
Haojun Liao 已提交
280
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
281 282 283 284 285 286
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
287 288 289
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
290 291 292 293
  }
}

// init file iterator
H
Hongze Cheng 已提交
294 295
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
296

297 298
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
H
Hongze Cheng 已提交
299
  pIter->pFileList = aDFileSet;
300
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
301

H
Haojun Liao 已提交
302
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
303 304 305
  return TSDB_CODE_SUCCESS;
}

306
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
307 308
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
309 310 311
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
312 313 314 315 316
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
317

318
  while (1) {
H
Haojun Liao 已提交
319 320 321
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
322

323
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
324

325 326 327 328
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
329

330 331
    pReader->cost.headFileLoad += 1;

332 333 334 335 336 337 338 339 340 341 342 343
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
344 345 346
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
347 348
      continue;
    }
C
Cary Xu 已提交
349

350
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
351
              pReader->window.ekey, pReader->idStr);
352 353
    return true;
  }
354

355
_err:
H
Haojun Liao 已提交
356 357 358
  return false;
}

359
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
360 361
  pIter->order = order;
  pIter->index = -1;
H
Haojun Liao 已提交
362
  pIter->numOfBlocks = -1;
363 364 365 366 367
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
368
  pIter->pTableMap = pTableMap;
369 370
}

L
Liu Jicong 已提交
371
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
372

H
Haojun Liao 已提交
373
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
374 375
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
376 377
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

401 402
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
403
  int32_t      code = 0;
404
  int8_t       level = 0;
H
Haojun Liao 已提交
405
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
406 407
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
408
    goto _end;
H
Hongze Cheng 已提交
409 410
  }

H
Haojun Liao 已提交
411
  initReaderStatus(&pReader->status);
412

L
Liu Jicong 已提交
413
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
414 415
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
416
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
417 418
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
419
  pReader->type = pCond->type;
420
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
421

422
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
423

424
  limitOutputBufferSize(pCond, &pReader->capacity);
425

426 427
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
428
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
429
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
430
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
431 432 433
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
434

435 436
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Haojun Liao 已提交
437 438 439 440 441 442
  code = tBlockDataInit(&pReader->status.fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

443 444 445 446
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
447
  }
H
Hongze Cheng 已提交
448

449 450
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
451 452
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
453

H
Haojun Liao 已提交
454 455
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
456 457 458
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
492
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
516

H
Hongze Cheng 已提交
517
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
518

H
Hongze Cheng 已提交
519 520 521
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
524

H
Hongze Cheng 已提交
525 526 527 528 529 530 531 532 533 534 535
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537 538
//   return midSlot;
// }
H
Hongze Cheng 已提交
539

H
Haojun Liao 已提交
540
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
541
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
542

543
  int64_t st = taosGetTimestampUs();
544
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
H
Haojun Liao 已提交
545
  if (code != TSDB_CODE_SUCCESS) {
546
    goto _end;
H
Haojun Liao 已提交
547
  }
H
Hongze Cheng 已提交
548

549 550
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Hongze Cheng 已提交
551
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
552 553
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
554

555 556 557 558
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
559
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
560

561
    // uid check
H
Hongze Cheng 已提交
562
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
563 564 565 566
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
567
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
568 569 570 571 572 573
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
574
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
575 576
    }

H
Hongze Cheng 已提交
577
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
578
  }
H
Hongze Cheng 已提交
579

580
  int64_t et2 = taosGetTimestampUs();
581
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
582
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
583 584 585

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

586
_end:
H
Hongze Cheng 已提交
587
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
588 589
  return code;
}
H
Hongze Cheng 已提交
590

591 592
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables,
                               int32_t* numOfBlocks) {
H
Haojun Liao 已提交
593 594
  size_t numOfTables = taosArrayGetSize(pIndexList);
  *numOfValidTables = 0;
H
Hongze Cheng 已提交
595

596
  int64_t st = taosGetTimestampUs();
597
  size_t  size = 0;
598

599
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
600
  while (1) {
601 602 603 604 605
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }

606
    tMapDataClear(&px->mapData);
607 608 609
    taosArrayClear(px->pBlockList);
  }

dengyihao's avatar
dengyihao 已提交
610
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
611
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
612

613
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
614

615 616
    tMapDataReset(&pScanInfo->mapData);
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL);
617

618 619
    size += pScanInfo->mapData.nData;
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
620
      SBlock block = {0};
621
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
622

623
      // 1. time range check
624
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
625 626
        continue;
      }
H
Hongze Cheng 已提交
627

628
      // 2. version range check
629 630 631
      if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
        continue;
      }
632

633
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
634
      if (p == NULL) {
635
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
636 637
        return TSDB_CODE_OUT_OF_MEMORY;
      }
638 639

      (*numOfBlocks) += 1;
H
Haojun Liao 已提交
640
    }
H
Hongze Cheng 已提交
641

H
Haojun Liao 已提交
642 643 644 645
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
      (*numOfValidTables) += 1;
    }
  }
H
Hongze Cheng 已提交
646

647
  double el = (taosGetTimestampUs() - st) / 1000.0;
648
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
649
            numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr);
650 651 652

  pReader->cost.numOfBlocks += (*numOfBlocks);
  pReader->cost.headFileLoadTime += el;
653

H
Haojun Liao 已提交
654 655
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
656

657 658
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
659
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
660

661
  pDumpInfo->allDumped = true;
662
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
663 664
}

665 666
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
667
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
668
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
669 670 671 672 673 674 675
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
676
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
677
  }
H
Haojun Liao 已提交
678 679
}

680 681 682 683 684
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pFBlockInfo;
}

685
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
686

687
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
688
  SReaderStatus*  pStatus = &pReader->status;
689
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
690

691
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
692
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
693
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
694
  SSDataBlock*        pResBlock = pReader->pResBlock;
695
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
696

H
Haojun Liao 已提交
697
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
698
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
699

H
Haojun Liao 已提交
700
  SColVal cv = {0};
701
  int64_t st = taosGetTimestampUs();
702 703
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
704

705
  int32_t rowIndex = 0;
706 707
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

708 709 710 711 712 713 714 715
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

716
  int32_t          i = 0;
717 718
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
719
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
720 721 722 723 724
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

725 726 727
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
728 729 730
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
731
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
732 733

    if (pData->cid == pColData->info.colId) {
734
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
735 736
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
737
      }
738
      colIndex += 1;
739
      ASSERT(rowIndex == remain);
740 741
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
742
    }
743 744 745 746

    i += 1;
  }

747
  while (i < numOfOutputCols) {
748 749 750
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
751
  }
H
Haojun Liao 已提交
752

753
  pResBlock->info.rows = remain;
754
  pDumpInfo->rowIndex += step * remain;
755 756

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
757

758
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
759
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
760

761
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
762
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
763
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
764 765 766 767 768 769
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

  return TSDB_CODE_SUCCESS;
}

770 771
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter,
                                   STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
772 773 774
  int64_t st = taosGetTimestampUs();

  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
775
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
776

777 778
  SSDataBlock* pResBlock = pReader->pResBlock;
  int32_t      numOfCols = blockDataGetNumOfCols(pResBlock);
779 780 781 782

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

783 784 785
  SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
  int32_t   code =
      tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, NULL, NULL);
786 787 788 789
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

790
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
791 792 793 794
  pReader->cost.blockLoadTime += elapsedTime;

  pDumpInfo->allDumped = false;
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
795
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
796
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
H
Haojun Liao 已提交
797
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
798

H
Haojun Liao 已提交
799
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
800 801

_error:
H
Haojun Liao 已提交
802 803 804 805 806
  tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, %s",
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pReader->idStr);
  return code;
H
Haojun Liao 已提交
807
}
H
Hongze Cheng 已提交
808

H
Haojun Liao 已提交
809 810 811
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
812

H
Haojun Liao 已提交
813 814 815 816
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
817

H
Haojun Liao 已提交
818 819
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
820

H
Haojun Liao 已提交
821 822
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
823

H
Haojun Liao 已提交
824
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
825 826
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
827

H
Haojun Liao 已提交
828 829 830 831
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
832

H
Haojun Liao 已提交
833 834
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
835

H
Haojun Liao 已提交
836
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
837
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
838
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
839

H
Haojun Liao 已提交
840
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
841

H
Haojun Liao 已提交
842 843
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
844

H
Haojun Liao 已提交
845 846 847 848 849 850 851
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
852

853
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
854
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
855

856 857 858 859
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
860
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
861 862 863 864 865 866 867 868 869 870
  STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

  int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
  tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
871
}
H
Hongze Cheng 已提交
872

H
Haojun Liao 已提交
873
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
874
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
875

876
  pBlockIter->numOfBlocks = numOfBlocks;
877 878
  taosArrayClear(pBlockIter->blockList);

879 880
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
881

882
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
883

884
  SBlockOrderSupporter sup = {0};
885
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
886 887 888
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
889

890 891 892 893 894 895 896
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
897

898 899 900 901
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
902

903 904
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
905

906 907 908 909 910
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
911

912
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
913
    SBlock block = {0};
914 915
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
916 917 918 919

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

920
      wrapper.uid = pTableScanInfo->uid;
921
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
922

923 924 925 926 927 928
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
929

930
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
931

932 933 934 935 936
  // since there is only one table qualified, blocks are not sorted
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
937
    }
938

939
    int64_t et = taosGetTimestampUs();
940 941
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
              pReader, cnt, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
942

943
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
944
    cleanupBlockOrderSupporter(&sup);
945
    doSetCurrentBlock(pBlockIter);
946
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
947
  }
H
Haojun Liao 已提交
948

949 950
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
951

952
  assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
953

954 955 956 957 958
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
959
  }
H
Haojun Liao 已提交
960

961 962 963 964
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
965

966 967
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
968

969 970 971 972
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
973

974 975
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
976
  }
H
Haojun Liao 已提交
977

978
  int64_t et = taosGetTimestampUs();
979 980
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0,
            pReader->idStr);
981 982
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
983

984
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
985 986
  doSetCurrentBlock(pBlockIter);

987
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
988
}
H
Hongze Cheng 已提交
989

H
Haojun Liao 已提交
990
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
991 992
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

993
  int32_t step = asc ? 1 : -1;
994
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
995 996 997
    return false;
  }

998
  pBlockIter->index += step;
999 1000
  doSetCurrentBlock(pBlockIter);

1001 1002 1003
  return true;
}

1004 1005 1006
/**
 * This is an two rectangles overlap cases.
 */
1007
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1008 1009 1010 1011
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
         (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
         (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
H
Haojun Liao 已提交
1012
}
H
Hongze Cheng 已提交
1013

1014 1015
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1016 1017 1018
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1019 1020
  }

1021
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1022 1023 1024
    return NULL;
  }

1025
  int32_t step = asc ? 1 : -1;
1026
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1027

1028
  SBlock*  pBlock = taosMemoryCalloc(1, sizeof(SBlock));
1029 1030 1031 1032
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1033 1034 1035 1036 1037
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1038
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1039 1040
  int32_t index = pBlockIter->index;

1041
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1054
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1055 1056 1057 1058 1059
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1060 1061 1062 1063 1064
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1065

1066 1067 1068
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1069

1070
  doSetCurrentBlock(pBlockIter);
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1081
}
H
Hongze Cheng 已提交
1082

1083
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1084
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1085

1086
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1087
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1088
}
H
Hongze Cheng 已提交
1089

H
Haojun Liao 已提交
1090
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
1091 1092
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
         (pBlock->minVersion <= pVerRange->maxVer);
H
Haojun Liao 已提交
1093 1094
}

1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
      if (p->version >= pBlock->minVersion) {
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
      if (p->version >= pBlock->minVersion) {
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1129
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1130 1131 1132 1133
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1134
  // ts is not overlap
1135
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1136
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1137 1138 1139 1140 1141
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1142 1143 1144 1145
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1146
    while (1) {
1147 1148 1149 1150 1151
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1152 1153 1154
      }
    }

1155 1156
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1157 1158
}

1159 1160 1161 1162
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1163
// 5. delete info should not overlap with current block data
1164 1165
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY key) {
1166 1167 1168
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1169
  // overlap with neighbor
1170 1171 1172
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1173
    taosMemoryFree(pNeighbor);
1174 1175
  }

1176
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1177 1178
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1179

1180
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1181
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
H
Haojun Liao 已提交
1182 1183
}

1184
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1185
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1186 1187
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1188

1189 1190 1191
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1192
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1193

1194
  blockDataUpdateTsWindow(pBlock, 0);
1195
  pBlock->info.uid = pBlockScanInfo->uid;
1196

1197
  setComposedBlockFlag(pReader, true);
1198

1199
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1200
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange" PRId64
1201 1202 1203
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1204 1205

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1206 1207 1208
  return code;
}

1209
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
H
Haojun Liao 已提交
1210
                                     SIterInfo* pIter, int64_t key) {
1211
  SRowMerger          merge = {0};
H
Haojun Liao 已提交
1212
  STSRow*             pTSRow = NULL;
1213 1214 1215 1216
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBKEY k = TSDBROW_KEY(pRow);
1217
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1218
  SArray* pDelList = pBlockScanInfo->delSkyline;
1219

1220 1221 1222 1223 1224 1225 1226 1227
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else if (k.ts < key) {  // k.ts < key
1228
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1229 1230 1231
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1232 1233

      tRowMerge(&merge, pRow);
1234
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1235 1236

      tRowMergerGetRow(&merge, &pTSRow);
1237
    }
1238 1239
  } else {  // descending order scan
    if (key < k.ts) {
1240
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1241 1242
    } else if (k.ts < key) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1243

1244 1245 1246 1247 1248 1249
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else {  // descending order: mem rows -----> imem rows ------> file block
      updateSchema(pRow, pBlockScanInfo->uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1250
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1251 1252 1253 1254 1255 1256

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
    }
1257 1258
  }

1259
  tRowMergerClear(&merge);
1260
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
H
Haojun Liao 已提交
1261 1262

  taosMemoryFree(pTSRow);
1263 1264 1265
  return TSDB_CODE_SUCCESS;
}

1266 1267 1268 1269
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1270 1271
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
dengyihao's avatar
dengyihao 已提交
1272
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1273

1274 1275
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1276
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1277

1278
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
H
Haojun Liao 已提交
1279

1280
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1281

1282 1283 1284 1285
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

  if (ASCENDING_TRAVERSE(pReader->order)) {
1286 1287
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1288 1289 1290
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1291
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1292

1293 1294
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1295
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1296 1297
      }

1298 1299
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1300
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1301 1302 1303
      }

      tRowMergerGetRow(&merge, &pTSRow);
1304
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1305
      return TSDB_CODE_SUCCESS;
1306
    } else {  // key > ik.ts || key > k.ts
1307 1308
      ASSERT(key != ik.ts);

1309
      // [3] ik.ts < key <= k.ts
1310
      // [4] ik.ts < k.ts <= key
1311
      if (ik.ts < k.ts) {
1312
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1313
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1314 1315 1316
        return TSDB_CODE_SUCCESS;
      }

1317 1318
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1319
      if (k.ts < ik.ts) {
1320
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
1321
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1322 1323 1324
        return TSDB_CODE_SUCCESS;
      }

1325
      // [7] k.ts == ik.ts < key
1326
      if (k.ts == ik.ts) {
1327 1328
        ASSERT(key > ik.ts && key > k.ts);

1329
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
1330
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1331 1332 1333
        return TSDB_CODE_SUCCESS;
      }
    }
1334 1335 1336 1337 1338 1339
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
      updateSchema(pRow, uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1340
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1341 1342 1343

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1344
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1345 1346 1347 1348 1349 1350 1351 1352 1353
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
1354
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1355 1356
      return TSDB_CODE_SUCCESS;
    } else {
1357
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1358 1359 1360 1361

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1362
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1363
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
1375
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1376 1377 1378 1379 1380
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1381
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1382 1383 1384 1385 1386

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
1387
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1388 1389 1390 1391 1392 1393
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1394
  return -1;
1395 1396
}

dengyihao's avatar
dengyihao 已提交
1397 1398
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1410
  TSDBKEY k = {.ts = ts, .version = ver};
1411
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1412 1413 1414
    return false;
  }

1415 1416 1417
  return true;
}

1418
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1419

1420 1421 1422 1423 1424
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;

  int64_t  key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1425 1426
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1427

1428
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1429
    return doMergeThreeLevelRows(pReader, pBlockScanInfo);
1430
  } else {
1431
    // imem + file
1432
    if (pBlockScanInfo->iiter.hasVal) {
H
Haojun Liao 已提交
1433
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key);
1434 1435
    }

1436
    // mem + file
1437
    if (pBlockScanInfo->iter.hasVal) {
H
Haojun Liao 已提交
1438
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1439
    }
1440

1441
    // imem & mem are all empty, only file exist
1442 1443 1444 1445 1446 1447

    // opt version
    // 1. it is not a border point
    // 2. the direct next point is not an duplicated timestamp
    if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
        (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1448
      int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
H
Haojun Liao 已提交
1449
      int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1450
      if (nextKey != key) {  // merge is not needed
1451
        doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
H
Haojun Liao 已提交
1452
        pDumpInfo->rowIndex += step;
1453 1454 1455 1456
        return TSDB_CODE_SUCCESS;
      }
    }

1457
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1458

H
Haojun Liao 已提交
1459
    STSRow*    pTSRow = NULL;
1460
    SRowMerger merge = {0};
H
Haojun Liao 已提交
1461

1462 1463 1464
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
1465
    doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
1466

H
Haojun Liao 已提交
1467 1468
    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
1469
    return TSDB_CODE_SUCCESS;
1470 1471 1472
  }
}

1473
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
1474 1475
  SSDataBlock* pResBlock = pReader->pResBlock;

1476
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1477 1478
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1479

1480 1481
  int64_t st = taosGetTimestampUs();

1482
  while (1) {
1483 1484
    // todo check the validate of row in file block
    {
1485
      if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
1486 1487
        pDumpInfo->rowIndex += step;

1488
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1489 1490 1491 1492 1493 1494 1495 1496 1497
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
    }

1498
    buildComposedDataBlockImpl(pReader, pBlockScanInfo);
1499
    SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1500

1501 1502 1503 1504 1505 1506 1507 1508
    // currently loaded file data block is consumed
    if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
1509 1510 1511 1512
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
1513 1514
  blockDataUpdateTsWindow(pResBlock, 0);

1515
  setComposedBlockFlag(pReader, true);
1516
  int64_t et = taosGetTimestampUs();
1517

1518 1519 1520 1521
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
            " rows:%d, elapsed time:%.2f ms %s",
            pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
            pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
1522

1523 1524 1525 1526 1527
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

1528
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1529 1530 1531 1532
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

1533
  int32_t code = TSDB_CODE_SUCCESS;
1534 1535 1536 1537 1538 1539 1540 1541 1542

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
1543 1544

  STbData* d = NULL;
H
Hongze Cheng 已提交
1545 1546
  if (pReader->pReadSnap->pMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
1547
    if (d != NULL) {
1548
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
1549
      if (code == TSDB_CODE_SUCCESS) {
1550
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
1551

H
Haojun Liao 已提交
1552
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1553 1554
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
1555
      } else {
1556 1557
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1558
        return code;
1559 1560
      }
    }
H
Haojun Liao 已提交
1561
  } else {
1562
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
1563 1564
  }

1565
  STbData* di = NULL;
H
Hongze Cheng 已提交
1566 1567
  if (pReader->pReadSnap->pIMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
1568
    if (di != NULL) {
1569
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
1570
      if (code == TSDB_CODE_SUCCESS) {
1571
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
1572

H
Haojun Liao 已提交
1573
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1574
                  "-%" PRId64 " %s",
1575
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
1576
      } else {
1577 1578
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1579
        return code;
1580 1581
      }
    }
H
Haojun Liao 已提交
1582 1583
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
1584 1585
  }

1586 1587
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

1588
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
1589 1590 1591
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1592 1593
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
1594 1595 1596
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
1597

1598 1599 1600
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

1601 1602
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
1603
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
1604 1605 1606
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
1607
    if (code != TSDB_CODE_SUCCESS) {
1608 1609 1610 1611 1612
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
1613
      tsdbDelFReaderClose(&pDelFReader);
1614 1615 1616
      goto _err;
    }

1617
    code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
1618 1619 1620
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
1621 1622
      goto _err;
    }
1623

1624 1625 1626
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
1627 1628
    if (pIdx != NULL) {
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
1629 1630 1631 1632 1633 1634 1635
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
1636
    }
1637
  }
1638

1639 1640 1641 1642 1643 1644 1645
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
1646 1647
  }

1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
1662 1663
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
1664 1665
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
1666 1667
  return code;

1668 1669 1670
_err:
  taosArrayDestroy(pDelData);
  return code;
1671 1672
}

1673 1674 1675
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

1676
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
1677 1678
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

1679 1680
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
1681
  if (pRow != NULL) {
1682 1683 1684
    key = TSDBROW_KEY(pRow);
  }

1685
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
1686
  if (pRow != NULL) {
1687 1688 1689 1690 1691 1692 1693 1694 1695
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

H
Haojun Liao 已提交
1696 1697
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
  SReaderStatus* pStatus = &pReader->status;
1698

1699
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
1700
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
1701 1702

  while (1) {
1703
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
1704
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
1705 1706 1707
      break;
    }

H
Haojun Liao 已提交
1708
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
1709 1710
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1711
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1712 1713 1714 1715 1716 1717 1718
      return code;
    }

    if (taosArrayGetSize(pIndexList) > 0) {
      uint32_t numOfValidTable = 0;
      code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1719
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
        return code;
      }

      if (numOfValidTable > 0) {
        break;
      }
    }
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
1730
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1731 1732 1733
  return TSDB_CODE_SUCCESS;
}

1734 1735 1736
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

1737
  SReaderStatus*  pStatus = &pReader->status;
1738 1739
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

1740 1741
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
1742

1743
  SBlock* pBlock = getCurrentBlock(pBlockIter);
1744 1745 1746

  TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
  if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
H
Haojun Liao 已提交
1747 1748
    tBlockDataReset(&pStatus->fileBlockData);
    tBlockDataClearData(&pStatus->fileBlockData);
1749
    code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
1750 1751 1752 1753 1754
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // build composed data block
1755
    code = buildComposedDataBlock(pReader, pScanInfo);
1756 1757
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
1758
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
1759
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
1760
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
1761
  } else {  // whole block is required, return it directly
1762
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
1763 1764 1765
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
1766
    setComposedBlockFlag(pReader, false);
1767
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
1768 1769 1770 1771 1772
  }

  return code;
}

H
Haojun Liao 已提交
1773
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
1774 1775
  SReaderStatus* pStatus = &pReader->status;

1776
  while (1) {
1777 1778 1779
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
1780
        return TSDB_CODE_SUCCESS;
1781 1782 1783 1784
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
1785
    initMemDataIterator(pBlockScanInfo, pReader);
1786

1787
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
1788
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
1789 1790 1791 1792
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1793
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
1794
      return TSDB_CODE_SUCCESS;
1795 1796 1797 1798 1799
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
1800
      return TSDB_CODE_SUCCESS;
1801 1802 1803 1804
    }
  }
}

1805
// set the correct start position in case of the first/last file block, according to the query time window
1806
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
1807
  SBlock* pBlock = getCurrentBlock(pBlockIter);
1808

1809 1810 1811
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
1812 1813 1814

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
1815
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
1816 1817
}

1818
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
  int32_t numOfBlocks = 0;
  int32_t code = moveToNextFile(pReader, &numOfBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
  if (numOfBlocks == 0) {
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
  code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
1833 1834

  // set the correct start position according to the query time window
1835
  initBlockDumpInfo(pReader, pBlockIter);
1836 1837 1838
  return code;
}

1839
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
1840 1841
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
1842 1843
}

1844
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
1845
  int32_t code = TSDB_CODE_SUCCESS;
1846 1847
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

1848 1849
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

1850
  while (1) {
1851
    SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
1852 1853
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

1854 1855
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1856
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
      code = buildComposedDataBlock(pReader, pScanInfo);
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
        } else {  // data blocks in current file are exhausted, let's try the next file now
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
1872
        }
H
Haojun Liao 已提交
1873
      }
1874 1875 1876

      // current block is not loaded yet, or data in buffer may overlap with the file block.
      code = doBuildDataBlock(pReader);
1877 1878
    }

1879 1880 1881 1882 1883 1884 1885 1886
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
1887
}
H
refact  
Hongze Cheng 已提交
1888

1889 1890
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
1891
  if (VND_IS_RSMA(pVnode)) {
1892
    int8_t  level = 0;
1893 1894
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

1895
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

1909 1910
    int32_t     vgId = TD_VID(pVnode);
    const char* str = (idStr != NULL) ? idStr : "";
1911 1912

    if (level == TSDB_RETENTION_L0) {
1913
      *pLevel = TSDB_RETENTION_L0;
1914
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
1915 1916
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
1917
      *pLevel = TSDB_RETENTION_L1;
1918
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
1919 1920
      return VND_RSMA1(pVnode);
    } else {
1921
      *pLevel = TSDB_RETENTION_L2;
1922
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
1923 1924 1925 1926 1927 1928 1929
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
1930
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
1931
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
1932 1933

  int64_t endVer = 0;
L
Liu Jicong 已提交
1934 1935
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
1936 1937
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
1938
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
1939 1940
  }

H
Haojun Liao 已提交
1941
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
1942 1943
}

H
Hongze Cheng 已提交
1944 1945 1946 1947
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
1948

H
Hongze Cheng 已提交
1949 1950
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
1951

H
Hongze Cheng 已提交
1952 1953
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
1954
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
1955

H
Hongze Cheng 已提交
1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
1970
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
1971 1972 1973 1974 1975 1976
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

1977
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
1978 1979 1980 1981
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
1982 1983 1984
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
1985

1986 1987 1988 1989 1990 1991
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
1992
        return false;
1993 1994 1995
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
1996 1997
      }
    } else {
1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2028 2029
    }
  } else {
2030 2031
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2032

2033 2034 2035 2036 2037 2038 2039
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2040
    } else {
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2068 2069 2070 2071 2072
      }

      return false;
    }
  }
2073 2074

  return false;
2075 2076 2077 2078
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2079 2080
    return NULL;
  }
H
Hongze Cheng 已提交
2081

2082
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2083
  TSDBKEY  key = TSDBROW_KEY(pRow);
2084
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2085
    pIter->hasVal = false;
H
Haojun Liao 已提交
2086 2087
    return NULL;
  }
H
Hongze Cheng 已提交
2088

2089
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2090
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2091
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2092 2093
    return pRow;
  }
H
Hongze Cheng 已提交
2094

2095
  while (1) {
2096 2097
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2098 2099
      return NULL;
    }
H
Hongze Cheng 已提交
2100

2101
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2102

H
Haojun Liao 已提交
2103
    key = TSDBROW_KEY(pRow);
2104
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2105
      pIter->hasVal = false;
H
Haojun Liao 已提交
2106 2107
      return NULL;
    }
H
Hongze Cheng 已提交
2108

dengyihao's avatar
dengyihao 已提交
2109
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2110
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2111 2112 2113 2114
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2115

2116 2117
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2118
  while (1) {
2119 2120
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2121 2122
      break;
    }
H
Hongze Cheng 已提交
2123

2124
    // data exists but not valid
2125
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2126 2127 2128 2129 2130
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2131
    TSDBKEY k = TSDBROW_KEY(pRow);
2132
    if (k.ts != ts) {
H
Haojun Liao 已提交
2133 2134 2135
      break;
    }

2136 2137
    int32_t   sversion = TSDBROW_SVERSION(pRow);
    STSchema* pTSchema = NULL;
2138
    if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
2139
      metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
2140 2141 2142
      if (pReader->pSchema == NULL) {
        pReader->pSchema = pTSchema;
      }
2143 2144 2145 2146 2147
    } else {
      pTSchema = pReader->pSchema;
    }

    tRowMergerAdd(pMerger, pRow, pTSchema);
M
Minglei Jin 已提交
2148

2149
    if (pTSchema != pReader->pSchema) {
M
Minglei Jin 已提交
2150 2151
      taosMemoryFree(pTSchema);
    }
H
Haojun Liao 已提交
2152 2153 2154 2155 2156
  }

  return TSDB_CODE_SUCCESS;
}

2157
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2158
                                          SVersionRange* pVerRange, int32_t step) {
2159 2160
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2161
      rowIndex += step;
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2179 2180
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2181
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2182
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2183

2184
  *state = CHECK_FILEBLOCK_QUIT;
2185
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2186 2187 2188

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2189
  if (pNeighborBlock == NULL) {  // do nothing
2190 2191 2192 2193
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2194 2195
  taosMemoryFree(pNeighborBlock);

2196
  if (overlap) {  // load next block
2197
    SReaderStatus*  pStatus = &pReader->status;
2198 2199
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2200
    // 1. find the next neighbor block in the scan block list
2201
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2202
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2203

2204
    // 2. remove it from the scan block list
2205
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2206

2207
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2208 2209
    tBlockDataReset(&pStatus->fileBlockData);
    tBlockDataClearData(&pStatus->fileBlockData);
2210 2211 2212 2213 2214
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2215
    // 4. check the data values
2216 2217 2218 2219
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2220
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2221 2222 2223 2224 2225 2226 2227
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2228 2229
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2230 2231
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2232
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2233
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2234
  int32_t step = asc ? 1 : -1;
2235

2236
  pDumpInfo->rowIndex += step;
2237
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2238 2239 2240
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2241

2242 2243 2244 2245
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2246

2247
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2248
      SBlock*             pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2249 2250 2251
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2252
      }
2253
    }
H
Haojun Liao 已提交
2254
  }
2255

H
Haojun Liao 已提交
2256 2257 2258
  return TSDB_CODE_SUCCESS;
}

2259
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
2260 2261 2262
  int32_t sversion = TSDBROW_SVERSION(pRow);

  if (pReader->pSchema == NULL) {
M
Minglei Jin 已提交
2263
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2264 2265
  } else if (pReader->pSchema->version != sversion) {
    taosMemoryFreeClear(pReader->pSchema);
M
Minglei Jin 已提交
2266
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2267 2268 2269
  }
}

dengyihao's avatar
dengyihao 已提交
2270 2271
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                      STsdbReader* pReader) {
2272 2273 2274
  SRowMerger merge = {0};

  TSDBKEY k = TSDBROW_KEY(pRow);
2275 2276 2277
  // updateSchema(pRow, uid, pReader);
  int32_t   sversion = TSDBROW_SVERSION(pRow);
  STSchema* pTSchema = NULL;
2278
  if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
2279
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
2280 2281 2282
    if (pReader->pSchema == NULL) {
      pReader->pSchema = pTSchema;
    }
2283 2284 2285
  } else {
    pTSchema = pReader->pSchema;
  }
H
Haojun Liao 已提交
2286

2287 2288
  tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema);
  doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
2289
  tRowMergerGetRow(&merge, pTSRow);
2290
  tRowMergerClear(&merge);
M
Minglei Jin 已提交
2291

2292
  if (pTSchema != pReader->pSchema) {
M
Minglei Jin 已提交
2293 2294
    taosMemoryFree(pTSchema);
  }
2295 2296
}

2297 2298
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2299 2300
  SRowMerger merge = {0};

2301 2302 2303
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2304 2305 2306 2307
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
    updateSchema(piRow, pBlockScanInfo->uid, pReader);

    tRowMergerInit(&merge, piRow, pReader->pSchema);
2308
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2309

2310
    tRowMerge(&merge, pRow);
2311
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2312 2313
  } else {
    updateSchema(pRow, pBlockScanInfo->uid, pReader);
2314

2315
    tRowMergerInit(&merge, pRow, pReader->pSchema);
2316
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2317 2318

    tRowMerge(&merge, piRow);
2319
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2320
  }
2321 2322 2323 2324

  tRowMergerGetRow(&merge, pTSRow);
}

2325 2326
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
                            int64_t endKey) {
2327 2328
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2329
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2330

2331 2332
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2333
  if (pBlockScanInfo->iter.hasVal) {
2334 2335 2336 2337 2338 2339
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2340
  if (pBlockScanInfo->iiter.hasVal) {
2341 2342 2343 2344 2345 2346
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2347
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2348
    TSDBKEY k = TSDBROW_KEY(pRow);
2349
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2350

2351
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2352
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
2353
    } else if (k.ts < ik.ts) {
2354
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
2355 2356
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
H
Haojun Liao 已提交
2357
    }
2358 2359

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2360 2361
  }

2362 2363
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2364 2365 2366
    return TSDB_CODE_SUCCESS;
  }

2367 2368
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2369 2370 2371 2372 2373 2374
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

2375
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
2376 2377 2378
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

2379
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2380
  STSchema*           pSchema = pReader->pSchema;
2381

2382
  SColVal colVal = {0};
2383
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2384

2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
      tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2405
    }
2406 2407
  }

2408
  // set null value since current column does not exist in the "pSchema"
2409
  while (i < numOfCols) {
2410 2411 2412 2413 2414
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

2415 2416 2417 2418
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2419 2420 2421 2422 2423 2424 2425 2426 2427
int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
2428
    i += 1;
2429 2430 2431 2432 2433 2434
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

2435
  while (i < numOfOutputCols && j < numOfInputCols) {
2436
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
2437
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
2438 2439

    if (pData->cid == pCol->info.colId) {
2440 2441
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
2442 2443 2444 2445 2446 2447 2448 2449 2450 2451
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
2452
    colDataAppendNULL(pCol, outputRowIndex);
2453 2454 2455 2456 2457 2458 2459
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2460 2461
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2462 2463 2464 2465
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
2466
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
2467 2468
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
2469 2470
    }

2471
    doAppendRowFromTSRow(pBlock, pReader, pTSRow);
2472
    taosMemoryFree(pTSRow);
H
Haojun Liao 已提交
2473 2474

    // no data in buffer, return immediately
2475
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2476 2477 2478
      break;
    }

2479
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2480 2481 2482 2483
      break;
    }
  } while (1);

2484
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2485 2486
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2487

2488
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2489
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2490 2491 2492 2493 2494
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2495 2496 2497
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2498 2499 2500 2501 2502 2503
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2504

dengyihao's avatar
dengyihao 已提交
2505 2506 2507 2508 2509 2510
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2511

2512 2513 2514 2515
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
  return pReader->verRange.maxVer;
}

C
Cary Xu 已提交
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
2526
  if (!pCur) {
C
Cary Xu 已提交
2527 2528
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
2543
// ====================================== EXPOSED APIs ======================================
2544 2545
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
2546 2547
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2548 2549
    goto _err;
  }
H
Hongze Cheng 已提交
2550

2551
  // check for query time window
H
Haojun Liao 已提交
2552
  STsdbReader* pReader = *ppReader;
2553
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2554 2555 2556
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2557

2558 2559 2560
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
2561
    int32_t     order = pCond->order;
2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
2580
    } else {
2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

2597 2598
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
2599 2600 2601
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
2602

H
Haojun Liao 已提交
2603 2604 2605
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
2606

H
Hongze Cheng 已提交
2607
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
2608 2609 2610
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
2611

2612 2613
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
2614

2615
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
2616
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
2617 2618 2619 2620 2621 2622 2623 2624 2625 2626

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
2627
  } else {
2628
    STsdbReader*    pPrevReader = pReader->innerReader[0];
2629 2630
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

2631 2632
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
                        pPrevReader->idStr);
2633
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
2634 2635 2636 2637 2638 2639 2640 2641 2642

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2643 2644 2645
    }
  }

2646
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
2647
  return code;
H
Hongze Cheng 已提交
2648 2649

_err:
S
Shengliang Guan 已提交
2650
  tsdbError("failed to create data reader, codes %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
2651
  return code;
H
refact  
Hongze Cheng 已提交
2652 2653 2654
}

void tsdbReaderClose(STsdbReader* pReader) {
2655 2656
  if (pReader == NULL) {
    return;
2657
  }
H
refact  
Hongze Cheng 已提交
2658

2659 2660
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

H
Hongze Cheng 已提交
2661
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
2662

2663 2664 2665 2666
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
2667
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
2668 2669 2670 2671 2672
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
2673
  tBlockDataClear(&pReader->status.fileBlockData, true);
2674 2675

  cleanupDataBlockIterator(&pReader->status.blockIter);
2676 2677

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
2678
  destroyBlockScanInfo(pReader->status.pTableMap);
2679
  blockDataDestroy(pReader->pResBlock);
2680

H
Haojun Liao 已提交
2681 2682 2683
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
2684

2685
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
2686

2687 2688 2689 2690 2691
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
            " SMA-time:%.2f ms, "
            "fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
            "size:%.2f Kb %s",
2692
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
2693
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
2694
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
2695

2696 2697 2698
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
2699 2700
}

2701
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2702
  // cleanup the data that belongs to the previous data block
2703 2704
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
2705

2706
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
2707

2708 2709 2710 2711 2712
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
2713

2714 2715 2716
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
2717
      buildBlockFromBufferSequentially(pReader);
2718
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2719
    }
2720 2721 2722
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2723
  }
2724

2725
  return false;
H
refact  
Hongze Cheng 已提交
2726 2727
}

2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
2765 2766 2767 2768
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
2769 2770
}

2771 2772
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
2773
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
2774
      setBlockInfo(pReader, pDataBlockInfo);
2775
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
2776 2777 2778 2779 2780 2781 2782 2783 2784
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

2785
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
2786
  int32_t code = 0;
2787
  *allHave = false;
H
Hongze Cheng 已提交
2788

2789
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
2790 2791 2792 2793
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

2794
  // there is no statistics data for composed block
2795 2796 2797 2798
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2799

2800
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2801

2802
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2803
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
2804

2805 2806
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

2807
  if (tBlockHasSma(pBlock)) {
2808
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
2809
    if (code != TSDB_CODE_SUCCESS) {
2810 2811
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
2812 2813
      return code;
    }
2814 2815 2816
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
2817
  }
H
Hongze Cheng 已提交
2818

2819
  *allHave = true;
H
Hongze Cheng 已提交
2820

2821 2822
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
2823

2824 2825
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
2842 2843
      i += 1;
      j += 1;
2844 2845 2846 2847 2848 2849 2850
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

2851
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
2852
  pReader->cost.smaLoadTime += elapsed;
2853
  pReader->cost.smaData += 1;
2854 2855 2856

  *pBlockStatis = pSup->plist;

2857
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
2858 2859
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
2860
  return code;
H
Hongze Cheng 已提交
2861 2862
}

2863
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2864 2865 2866
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
2867
    return pReader->pResBlock->pDataBlock;
2868
  }
2869

2870
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
2871
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2872

H
Haojun Liao 已提交
2873 2874 2875
  tBlockDataReset(&pStatus->fileBlockData);
  tBlockDataClearData(&pStatus->fileBlockData);
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
2876
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
2877
    tBlockDataClear(&pStatus->fileBlockData, 1);
H
Haojun Liao 已提交
2878

2879 2880
    terrno = code;
    return NULL;
2881
  }
2882 2883 2884

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
2885 2886
}

2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
2899
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
2900 2901 2902
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2903

L
Liu Jicong 已提交
2904
  pReader->order = pCond->order;
2905
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
2906
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
2907
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
2908
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
2909

2910
  // allocate buffer in order to load data blocks from file
2911
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
2912 2913
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

2914
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
2915
  tsdbDataFReaderClose(&pReader->pFileReader);
2916

2917
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
2918 2919
  tsdbDataFReaderClose(&pReader->pFileReader);

H
Hongze Cheng 已提交
2920
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
2921
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
2922
  resetDataBlockScanInfo(pReader->status.pTableMap);
2923

2924
  int32_t         code = 0;
2925 2926
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2927 2928 2929 2930 2931 2932
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
2933 2934
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
2935 2936 2937
      return code;
    }
  }
H
Hongze Cheng 已提交
2938

dengyihao's avatar
dengyihao 已提交
2939 2940
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
2941

2942
  return code;
H
Hongze Cheng 已提交
2943
}
H
Hongze Cheng 已提交
2944

2945 2946 2947
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
2948

2949 2950 2951 2952
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
2953

2954 2955
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
2956

2957 2958 2959
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
2960

2961
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
2962

2963
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
2964

2965 2966
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
2967

2968 2969
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
2970 2971 2972 2973

  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
  }
H
Hongze Cheng 已提交
2974

2975
  pTableBlockInfo->numOfTables = numOfTables;
H
Haojun Liao 已提交
2976
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
2977

2978 2979
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
2980
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
2981

2982 2983
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
2984

2985 2986 2987
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
2988

2989 2990 2991
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
2992

2993 2994 2995
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
2996

2997 2998
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
2999 3000

      hasNext = blockIteratorNext(&pStatus->blockIter);
3001 3002 3003 3004 3005
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3006

3007
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
3008
      hasNext = (pBlockIter->numOfBlocks > 0);
3009
    }
H
refact  
Hongze Cheng 已提交
3010

H
Hongze Cheng 已提交
3011 3012
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3013
  }
H
Hongze Cheng 已提交
3014

H
refact  
Hongze Cheng 已提交
3015 3016
  return code;
}
H
Hongze Cheng 已提交
3017

H
refact  
Hongze Cheng 已提交
3018
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3019
  int64_t rows = 0;
H
Hongze Cheng 已提交
3020

3021 3022
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3023

3024 3025 3026 3027 3028
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
3029
      tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
3030 3031 3032 3033 3034 3035 3036
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
3037
      tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
3038 3039 3040 3041 3042 3043 3044 3045
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3046

H
refact  
Hongze Cheng 已提交
3047
  return rows;
H
Hongze Cheng 已提交
3048
}
D
dapan1121 已提交
3049

L
Liu Jicong 已提交
3050
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3063

D
dapan1121 已提交
3064
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3065
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3081

D
dapan1121 已提交
3082 3083
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3114
  // fs
H
Hongze Cheng 已提交
3115 3116 3117 3118 3119
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3120 3121 3122 3123 3124 3125 3126 3127

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3128
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3143
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3144
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3145
  }
H
Hongze Cheng 已提交
3146

S
Shengliang Guan 已提交
3147
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3148
}