tsdbRead.c 120.9 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "osDef.h"
H
Hongze Cheng 已提交
17
#include "tsdb.h"
18
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
19

20 21 22 23 24 25
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

26
typedef struct {
dengyihao's avatar
dengyihao 已提交
27
  STbDataIter* iter;
28 29 30 31
  int32_t      index;
  bool         hasVal;
} SIterInfo;

32 33 34 35 36
typedef struct {
  int32_t numOfBlocks;
  int32_t numOfLastBlocks;
} SBlockNumber;

H
Haojun Liao 已提交
37
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
38 39
  uint64_t  uid;
  TSKEY     lastKey;
40 41 42 43 44
  SMapData  mapData;      // block info (compressed)
  SArray*   pBlockList;   // block data index list
  SIterInfo iter;         // mem buffer skip list iterator
  SIterInfo iiter;        // imem buffer skip list iterator
  SArray*   delSkyline;   // delete info for this table
dengyihao's avatar
dengyihao 已提交
45
  int32_t   fileDelIndex;
46 47
  bool      iterInit;     // whether to initialize the in-memory skip list iterator or not
  int16_t   indexInBlockL;// row position in last block
H
Haojun Liao 已提交
48 49 50
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
51
  int64_t uid;
52
  int64_t offset;
H
Haojun Liao 已提交
53
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
54 55

typedef struct SBlockOrderSupporter {
56 57 58 59
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
60 61 62
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
63 64 65
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
66
  int64_t headFileLoad;
67 68 69
  double  headFileLoadTime;
  int64_t smaData;
  double  smaLoadTime;
H
Hongze Cheng 已提交
70 71 72
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
73
  SArray*          pColAgg;
74
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
75
  SColumnDataAgg** plist;
76 77
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
78 79
} SBlockLoadSuppInfo;

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
typedef struct SVersionRange {
  uint64_t minVer;
  uint64_t maxVer;
} SVersionRange;

typedef struct SLastBlockReader {
  SArray*       pBlockL;
  int32_t       currentBlockIndex;
  SBlockData    lastBlockData;
  STimeWindow   window;
  SVersionRange verRange;
  uint64_t      uid;
  int32_t       rowIndex;
} SLastBlockReader;

95
typedef struct SFilesetIter {
96 97 98
  int32_t numOfFiles;    // number of total files
  int32_t index;         // current accessed index in the list
  SArray* pFileList;     // data file list
H
Hongze Cheng 已提交
99
  int32_t order;
100
  SLastBlockReader* pLastBlockReader; // last file block reader
101
} SFilesetIter;
H
Haojun Liao 已提交
102 103

typedef struct SFileDataBlockInfo {
104
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
105
  uint64_t uid;
106
  int32_t  tbBlockIdx;
H
Haojun Liao 已提交
107 108 109
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
110
  int32_t   numOfBlocks;
111
  int32_t   index;
112
  SArray*   blockList;      // SArray<SFileDataBlockInfo>
113
  int32_t   order;
114
  SBlock    block;          // current SBlock data
115
  SHashObj* pTableMap;
H
Haojun Liao 已提交
116 117 118
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
119 120 121 122
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
123 124 125
} SFileBlockDumpInfo;

typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
126 127
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
128
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
129
  SFileBlockDumpInfo   fBlockDumpInfo;
130 131 132 133 134
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
135 136
} SReaderStatus;

H
Hongze Cheng 已提交
137
struct STsdbReader {
H
Haojun Liao 已提交
138 139 140 141 142 143 144
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
145 146
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
147
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
148
  STsdbReadSnap*     pReadSnap;
149
  SIOCostSummary     cost;
150 151
  STSchema*          pSchema;     // the newest version schema
  STSchema*          pMemSchema;  // the previous schema for in-memory data, to avoid load schema too many times
152 153
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
154

155 156
  int32_t      step;
  STsdbReader* innerReader[2];
H
Hongze Cheng 已提交
157
};
H
Hongze Cheng 已提交
158

H
Haojun Liao 已提交
159
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
160 161
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
162
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
163 164
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
165
static int32_t  doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger);
166
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
dengyihao's avatar
dengyihao 已提交
167
                                 STsdbReader* pReader);
H
Haojun Liao 已提交
168
static int32_t  doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
169
static int32_t  doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
170
                                     int32_t rowIndex);
171
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
172
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
173

dengyihao's avatar
dengyihao 已提交
174
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
175
                             STsdbReader* pReader, bool* freeTSRow);
176 177
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
178 179 180 181
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
182
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
183 184
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
H
Haojun Liao 已提交
185

186 187 188
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

189
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
190

191
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
192
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
193 194 195
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
196 197
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
198

H
Haojun Liao 已提交
199 200
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
201
    pSupInfo->colIds[i] = pCol->info.colId;
202 203 204 205

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
206
  }
H
Hongze Cheng 已提交
207

H
Haojun Liao 已提交
208 209
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
210

211
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
212
  // allocate buffer in order to load data blocks from file
213
  // todo use simple hash instead, optimize the memory consumption
214 215 216
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
217 218 219
    return NULL;
  }

220
  for (int32_t j = 0; j < numOfTables; ++j) {
221
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1};
222 223 224
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
225 226
      }

227
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
228
    } else {
229
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
230
    }
wmmhello's avatar
wmmhello 已提交
231

232 233 234
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
235 236
  }

237 238
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
239

240
  return pTableMap;
H
Hongze Cheng 已提交
241
}
H
Hongze Cheng 已提交
242

243 244 245
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
246
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
247 248
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
249
    if (p->iter.iter != NULL) {
250
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
251 252
    }

253
    p->delSkyline = taosArrayDestroy(p->delSkyline);
254 255 256
  }
}

257 258 259 260 261 262 263 264
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
265
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
266 267 268
    }

    if (p->iiter.iter != NULL) {
269
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
270 271
    }

272 273
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
274
    tMapDataClear(&p->mapData);
275 276 277 278 279
  }

  taosHashCleanup(pTableMap);
}

280
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
281 282
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
283
}
H
Hongze Cheng 已提交
284

285 286 287
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
288
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
289

290
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
291
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
292

dengyihao's avatar
dengyihao 已提交
293
  STimeWindow win = *pWindow;
294 295 296 297 298 299
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
300

H
Haojun Liao 已提交
301
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
302 303 304 305 306 307
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
308 309 310
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
311 312 313 314
  }
}

// init file iterator
H
Hongze Cheng 已提交
315 316
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
317

318 319
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
H
Hongze Cheng 已提交
320
  pIter->pFileList = aDFileSet;
321
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
322

323 324 325 326 327 328 329 330 331
  if (pIter->pLastBlockReader == NULL) {
    pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
    if (pIter->pLastBlockReader == NULL) {
      int32_t code = TSDB_CODE_OUT_OF_MEMORY;
      tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr);
      return code;
    }

    pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
332 333
  }

H
Haojun Liao 已提交
334
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
335 336 337
  return TSDB_CODE_SUCCESS;
}

338
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
339 340
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
341 342 343
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
344 345 346 347 348
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
349

350
  while (1) {
H
Haojun Liao 已提交
351 352 353
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
354

355
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
356

357 358 359 360
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
361

362 363
    pReader->cost.headFileLoad += 1;

364 365 366 367 368 369 370 371 372 373 374 375
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
376 377 378
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
379 380
      continue;
    }
C
Cary Xu 已提交
381

382
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
383
              pReader->window.ekey, pReader->idStr);
384 385
    return true;
  }
386

387
_err:
H
Haojun Liao 已提交
388 389 390
  return false;
}

391
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
392 393
  pIter->order = order;
  pIter->index = -1;
394
  pIter->numOfBlocks = -1;
395 396 397 398 399
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
400
  pIter->pTableMap = pTableMap;
401 402
}

L
Liu Jicong 已提交
403
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
404

H
Haojun Liao 已提交
405
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
406 407
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
408 409
}

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

433 434
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
                                const char* idstr) {
H
Haojun Liao 已提交
435
  int32_t      code = 0;
436
  int8_t       level = 0;
H
Haojun Liao 已提交
437
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
438 439
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
440
    goto _end;
H
Hongze Cheng 已提交
441 442
  }

C
Cary Xu 已提交
443 444 445 446
  if (VND_IS_TSMA(pVnode)) {
    tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode));
  }

H
Haojun Liao 已提交
447
  initReaderStatus(&pReader->status);
448

L
Liu Jicong 已提交
449
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
450 451
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
452
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
453 454
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
455
  pReader->type = pCond->type;
456
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
457

458
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
459

460
  limitOutputBufferSize(pCond, &pReader->capacity);
461

462 463
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
464
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
465
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
466
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
467 468 469
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
470

471 472
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Hongze Cheng 已提交
473
  code = tBlockDataCreate(&pReader->status.fileBlockData);
H
Haojun Liao 已提交
474 475 476 477 478
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

479 480 481 482
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
483
  }
H
Hongze Cheng 已提交
484

485 486
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
487 488
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
489

H
Haojun Liao 已提交
490 491
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
492 493 494
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
528
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
552

H
Hongze Cheng 已提交
553
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555 556 557
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561 562 563 564 565 566 567 568 569 570 571
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573 574
//   return midSlot;
// }
H
Hongze Cheng 已提交
575

H
Haojun Liao 已提交
576
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
577
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
578

579
  int64_t st = taosGetTimestampUs();
H
Hongze Cheng 已提交
580
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
H
Haojun Liao 已提交
581
  if (code != TSDB_CODE_SUCCESS) {
582
    goto _end;
H
Haojun Liao 已提交
583
  }
H
Hongze Cheng 已提交
584

585 586
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Hongze Cheng 已提交
587
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
588 589
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
590

591 592 593 594
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
595
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
596

597
    // uid check
H
Hongze Cheng 已提交
598
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
599 600 601 602
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
603
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
604 605 606 607 608 609
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
610
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
611 612
    }

H
Hongze Cheng 已提交
613
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
614
  }
H
Hongze Cheng 已提交
615

616
  int64_t et2 = taosGetTimestampUs();
617
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
618
            (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
619 620 621

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

622
_end:
H
Hongze Cheng 已提交
623
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
624 625
  return code;
}
H
Hongze Cheng 已提交
626

627 628 629
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
                               SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
  int32_t numOfQTable= 0;
H
Haojun Liao 已提交
630
  size_t numOfTables = taosArrayGetSize(pIndexList);
H
Hongze Cheng 已提交
631

632
  int64_t st = taosGetTimestampUs();
633
  size_t  size = 0;
634

635
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
636
  while (1) {
637 638 639 640 641
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }

642
    tMapDataClear(&px->mapData);
643 644 645
    taosArrayClear(px->pBlockList);
  }

dengyihao's avatar
dengyihao 已提交
646
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
647
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
648

649
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
650

651
    tMapDataReset(&pScanInfo->mapData);
H
Hongze Cheng 已提交
652
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
653

654 655
    size += pScanInfo->mapData.nData;
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
656
      SBlock block = {0};
657
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
658

659
      // 1. time range check
660
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
661 662
        continue;
      }
H
Hongze Cheng 已提交
663

664
      // 2. version range check
H
Hongze Cheng 已提交
665
      if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
666 667
        continue;
      }
668

669
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
670
      if (p == NULL) {
671
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
672 673
        return TSDB_CODE_OUT_OF_MEMORY;
      }
674

675
      pBlockNum->numOfBlocks += 1;
H
Haojun Liao 已提交
676
    }
H
Hongze Cheng 已提交
677

H
Haojun Liao 已提交
678
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
      numOfQTable += 1;
    }
  }

  size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
  for(int32_t i = 0; i < numOfLast; ++i) {
    SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
    if (pLastBlock->suid != pReader->suid) {
      continue;
    }

    {
      // 1. time range check, todo add later
//      if (pLastBlock->.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
//        continue;
//      }

      // 2. version range check
      if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
        continue;
      }

      pBlockNum->numOfLastBlocks += 1;
702
      taosArrayPush(pQualifiedLastBlock, pLastBlock);
H
Haojun Liao 已提交
703 704
    }
  }
H
Hongze Cheng 已提交
705

706 707
  int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;

708
  double el = (taosGetTimestampUs() - st) / 1000.0;
709 710
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
            numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, size / 1000.0, el, pReader->idStr);
711

712
  pReader->cost.numOfBlocks += total;
713
  pReader->cost.headFileLoadTime += el;
714

H
Haojun Liao 已提交
715 716
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
717

718 719
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
720
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
721

722
  pDumpInfo->allDumped = true;
723
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
724 725
}

726 727
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
728
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
729
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
730 731 732
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
H
Haojun Liao 已提交
733
      ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
734 735 736 737
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
738
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
739
  }
H
Haojun Liao 已提交
740 741
}

742
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
743 744
  if (taosArrayGetSize(pBlockIter->blockList) == 0) {
    ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
745 746
    return NULL;
  }
747 748 749

  SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pBlockInfo;
750 751
}

752
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
753

754
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
755
  SReaderStatus*  pStatus = &pReader->status;
756
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
757

758
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
759
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
760
  SBlock*             pBlock = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
761
  SSDataBlock*        pResBlock = pReader->pResBlock;
762
  int32_t             numOfOutputCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
763

H
Haojun Liao 已提交
764
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
765
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
766

H
Haojun Liao 已提交
767
  SColVal cv = {0};
768
  int64_t st = taosGetTimestampUs();
769 770
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
771

772
  int32_t rowIndex = 0;
773 774
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

775 776 777 778 779 780 781 782
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

783
  int32_t          i = 0;
784 785
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
786
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
787 788 789 790 791
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

792 793 794
  int32_t colIndex = 0;
  int32_t num = taosArrayGetSize(pBlockData->aIdx);
  while (i < numOfOutputCols && colIndex < num) {
795 796 797
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
798
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
799 800

    if (pData->cid == pColData->info.colId) {
801
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
802 803
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
804
      }
805
      colIndex += 1;
806
      ASSERT(rowIndex == remain);
807 808
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
809
    }
810 811 812 813

    i += 1;
  }

814
  while (i < numOfOutputCols) {
815 816 817
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
818
  }
H
Haojun Liao 已提交
819

820
  pResBlock->info.rows = remain;
821
  pDumpInfo->rowIndex += step * remain;
822 823

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
824

825
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
826
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
827

828
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
829
  tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
830
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
831
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
H
Hongze Cheng 已提交
832
            pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
833 834 835 836

  return TSDB_CODE_SUCCESS;
}

837
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
838
  int64_t st = taosGetTimestampUs();
839 840
  double elapsedTime = 0;
  int32_t code = 0;
841

842
  SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
843
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
844

845
  if (pBlockInfo != NULL) {
846 847 848
    SBlock* pBlock = getCurrentBlock(pBlockIter);
    code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
    if (code != TSDB_CODE_SUCCESS) {
849 850 851 852
      tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                ", rows:%d, code:%s %s",
                pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
                tstrerror(code), pReader->idStr);
853 854
      goto _error;
    }
855

856
    elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
857

858 859
    tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
860
              pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
861 862
              pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
  } else {
863 864 865 866 867
#if 0
    SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

    uint64_t uid = pBlockInfo->uid;
    SArray*  pBlocks = pLastBlockReader->pBlockL;
868

869 870 871 872 873 874 875 876 877 878 879 880
    pLastBlockReader->currentBlockIndex = -1;

    // find the correct SBlockL
    for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
      SBlockL* pBlock = taosArrayGet(pBlocks, i);
      if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
        pLastBlockReader->currentBlockIndex = i;
        break;
      }
    }

//    SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
881
    code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
882 883 884 885 886 887 888 889 890 891 892 893 894
    if (code != TSDB_CODE_SUCCESS) {
      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
                pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
      goto _error;
    }

    tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
                  ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
              pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
              pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
895 896 897 898
  }

  pReader->cost.blockLoadTime += elapsedTime;
  pDumpInfo->allDumped = false;
899

H
Haojun Liao 已提交
900
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
901 902

_error:
H
Haojun Liao 已提交
903
  return code;
H
Haojun Liao 已提交
904
}
H
Hongze Cheng 已提交
905

H
Haojun Liao 已提交
906 907 908
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
909

H
Haojun Liao 已提交
910 911 912 913
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
914

H
Haojun Liao 已提交
915 916
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
917

H
Haojun Liao 已提交
918 919
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
920

H
Haojun Liao 已提交
921
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
922 923
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
924

H
Haojun Liao 已提交
925 926 927 928
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
929

H
Haojun Liao 已提交
930 931
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
932

H
Haojun Liao 已提交
933
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
934
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
935
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
936

H
Haojun Liao 已提交
937
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
938

H
Haojun Liao 已提交
939 940
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
941

H
Haojun Liao 已提交
942 943 944 945 946 947 948
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
949

950
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
951
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
952

953 954 955 956
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
957
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
958 959 960 961 962
  if (pFBlock != NULL) {
    STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
    int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
    tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
  }
963 964 965 966 967 968

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
969
}
H
Hongze Cheng 已提交
970

971
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
972
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
973

974
  pBlockIter->numOfBlocks = numOfBlocks;
975 976
  taosArrayClear(pBlockIter->blockList);

977 978
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
979

980
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
981

982
  SBlockOrderSupporter sup = {0};
983
  int32_t              code = initBlockOrderSupporter(&sup, numOfTables);
984 985 986
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
987

988 989 990 991 992 993 994
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
995

996 997 998 999
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1000

1001 1002
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1003

1004 1005 1006 1007 1008
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1009

1010
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
1011
    SBlock block = {0};
1012 1013
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
1014 1015 1016 1017

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

1018
      wrapper.uid = pTableScanInfo->uid;
1019
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
1020

1021 1022 1023 1024 1025 1026
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1027

1028
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1029

1030
  // since there is only one table qualified, blocks are not sorted
1031 1032
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
1033 1034
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1035
    }
1036

1037
    int64_t et = taosGetTimestampUs();
1038
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
1039
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
H
Haojun Liao 已提交
1040

1041
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1042
    cleanupBlockOrderSupporter(&sup);
1043
    doSetCurrentBlock(pBlockIter);
1044
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1045
  }
H
Haojun Liao 已提交
1046

1047 1048
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1049

1050
  ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1051

1052 1053 1054 1055 1056
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1057
  }
H
Haojun Liao 已提交
1058

1059 1060 1061 1062
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1063

1064 1065
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1066

1067 1068 1069 1070
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1071

1072 1073
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1074
  }
H
Haojun Liao 已提交
1075

1076
  int64_t et = taosGetTimestampUs();
1077
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0,
1078
            pReader->idStr);
1079 1080
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1081

1082
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
1083 1084
  doSetCurrentBlock(pBlockIter);

1085
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1086
}
H
Hongze Cheng 已提交
1087

H
Haojun Liao 已提交
1088
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1089 1090
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1091
  int32_t step = asc ? 1 : -1;
1092
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1093 1094 1095
    return false;
  }

1096
  pBlockIter->index += step;
1097 1098
  doSetCurrentBlock(pBlockIter);

1099 1100 1101
  return true;
}

1102 1103 1104
/**
 * This is an two rectangles overlap cases.
 */
1105
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1106 1107
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
H
Hongze Cheng 已提交
1108 1109
         (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
         (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
H
Haojun Liao 已提交
1110
}
H
Hongze Cheng 已提交
1111

1112 1113
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1114 1115 1116
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1117 1118
  }

1119
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1120 1121 1122
    return NULL;
  }

1123
  int32_t step = asc ? 1 : -1;
1124
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1125

1126
  SBlock*  pBlock = taosMemoryCalloc(1, sizeof(SBlock));
1127 1128 1129 1130
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1131 1132 1133 1134 1135
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1136
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1137 1138
  int32_t index = pBlockIter->index;

1139
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1152
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1153
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
1154 1155 1156 1157
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1158 1159 1160 1161 1162
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1163

1164 1165 1166
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1167

1168
  doSetCurrentBlock(pBlockIter);
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1179
}
H
Hongze Cheng 已提交
1180

1181
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1182
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1183

1184
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1185
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1186
}
H
Hongze Cheng 已提交
1187

H
Haojun Liao 已提交
1188
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
H
Hongze Cheng 已提交
1189 1190
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
         (pBlock->minVer <= pVerRange->maxVer);
H
Haojun Liao 已提交
1191 1192
}

1193 1194 1195 1196 1197 1198
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1199
      if (p->version >= pBlock->minVer) {
1200 1201 1202
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
H
Hongze Cheng 已提交
1203
      if (p->version >= pBlock->minVer) {
1204 1205 1206 1207 1208 1209 1210
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
H
Hongze Cheng 已提交
1211
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1227
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1228 1229 1230 1231
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1232
  // ts is not overlap
1233
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1234
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1235 1236 1237 1238 1239
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1240 1241 1242 1243
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1244
    while (1) {
1245 1246 1247 1248 1249
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1250 1251 1252
      }
    }

1253 1254
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1255 1256
}

1257 1258 1259 1260
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1261
// 5. delete info should not overlap with current block data
1262
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
1263
                                STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
1264 1265 1266
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1267
  // overlap with neighbor
1268 1269 1270
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1271
    taosMemoryFree(pNeighbor);
1272 1273
  }

1274
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1275 1276
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1277

1278 1279 1280 1281 1282 1283 1284 1285
  // todo here we need to each key in the last files to identify if it is really overlapped with last block
  bool overlapWithlastBlock = false;
  if (hasDataInLastBlock(pLastBlockReader)) {
    SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
//    int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
    overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
  }

1286
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1287 1288
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) ||
          overlapWithDel || overlapWithlastBlock);
H
Haojun Liao 已提交
1289 1290
}

1291
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1292
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1293 1294
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1295

1296 1297 1298
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1299
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1300

1301
  blockDataUpdateTsWindow(pBlock, 0);
1302
  pBlock->info.uid = pBlockScanInfo->uid;
1303

1304
  setComposedBlockFlag(pReader, true);
1305

1306
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
S
Shengliang Guan 已提交
1307
  tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
1308 1309 1310
            " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
            pReader->idStr);
1311 1312

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1313 1314 1315
  return code;
}

1316 1317
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
                                            SFileBlockDumpInfo* pDumpInfo) {
1318 1319 1320 1321 1322
  // opt version
  // 1. it is not a border point
  // 2. the direct next point is not an duplicated timestamp
  if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
      (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
1323
    int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
1324 1325

    int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
1326
    if (nextKey != key) {  // merge is not needed
1327
      doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
1328 1329 1330 1331 1332 1333 1334 1335
      pDumpInfo->rowIndex += step;
      return true;
    }
  }

  return false;
}

H
Haojun Liao 已提交
1336 1337 1338 1339 1340 1341
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
  // always set the newest schema version in pReader->pSchema
  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
  }

1342
  if (pReader->pSchema && sversion == pReader->pSchema->version) {
H
Haojun Liao 已提交
1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360
    return pReader->pSchema;
  }

  if (pReader->pMemSchema == NULL) {
    int32_t code =
        metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
    return pReader->pMemSchema;
  }

  if (pReader->pMemSchema->version == sversion) {
    return pReader->pMemSchema;
  }

  taosMemoryFree(pReader->pMemSchema);
  int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
  return pReader->pMemSchema;
}

1361
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
H
Haojun Liao 已提交
1362
                                     SIterInfo* pIter, int64_t key) {
1363
  SRowMerger          merge = {0};
H
Haojun Liao 已提交
1364
  STSRow*             pTSRow = NULL;
1365 1366 1367
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1368 1369 1370 1371
  TSDBKEY  k = TSDBROW_KEY(pRow);
  TSDBROW  fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
  SArray*  pDelList = pBlockScanInfo->delSkyline;
  bool     freeTSRow = false;
H
Haojun Liao 已提交
1372
  uint64_t uid = pBlockScanInfo->uid;
1373

1374 1375 1376
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
1377 1378 1379 1380 1381 1382 1383
      // imem & mem are all empty, only file exist
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1384
        freeTSRow = true;
1385
      }
1386
    } else if (k.ts < key) {  // k.ts < key
1387
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1388 1389 1390
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1391 1392

      tRowMerge(&merge, pRow);
1393
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1394 1395

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1396
      freeTSRow = true;
1397
    }
1398 1399
  } else {  // descending order scan
    if (key < k.ts) {
1400
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow);
1401
    } else if (k.ts < key) {
1402 1403 1404 1405 1406 1407
      if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
        return TSDB_CODE_SUCCESS;
      } else {
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1408
        freeTSRow = true;
1409
      }
1410
    } else {  // descending order: mem rows -----> imem rows ------> file block
H
Haojun Liao 已提交
1411
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1412

H
Haojun Liao 已提交
1413
      tRowMergerInit(&merge, pRow, pSchema);
1414
      doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1415 1416 1417 1418 1419

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1420
      freeTSRow = true;
1421
    }
1422 1423
  }

1424
  tRowMergerClear(&merge);
H
Haojun Liao 已提交
1425
  doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
H
Haojun Liao 已提交
1426

H
Haojun Liao 已提交
1427 1428 1429
  if (freeTSRow) {
    taosMemoryFree(pTSRow);
  }
H
Haojun Liao 已提交
1430

1431 1432 1433
  return TSDB_CODE_SUCCESS;
}

1434
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
1435 1436 1437
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1438
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
dengyihao's avatar
dengyihao 已提交
1439
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1440

1441 1442
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1443
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1444

1445
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1446
  bool    freeTSRow = false;
H
Haojun Liao 已提交
1447

1448
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1449

1450 1451 1452
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
  if (ASCENDING_TRAVERSE(pReader->order)) {
1453 1454
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1455 1456 1457
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1458
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1459

1460 1461
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1462
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1463 1464
      }

1465 1466
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1467
        doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1468 1469 1470
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1471
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1472
      return TSDB_CODE_SUCCESS;
1473
    } else {  // key > ik.ts || key > k.ts
1474 1475
      ASSERT(key != ik.ts);

1476
      // [3] ik.ts < key <= k.ts
1477
      // [4] ik.ts < k.ts <= key
1478
      if (ik.ts < k.ts) {
1479
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1480
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1481 1482 1483
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1484 1485 1486
        return TSDB_CODE_SUCCESS;
      }

1487 1488
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1489
      if (k.ts < ik.ts) {
1490
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1491
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1492 1493 1494
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1495 1496 1497
        return TSDB_CODE_SUCCESS;
      }

1498
      // [7] k.ts == ik.ts < key
1499
      if (k.ts == ik.ts) {
1500 1501
        ASSERT(key > ik.ts && key > k.ts);

1502
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
H
Haojun Liao 已提交
1503
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1504
        taosMemoryFree(pTSRow);
1505 1506 1507
        return TSDB_CODE_SUCCESS;
      }
    }
1508 1509 1510
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
H
Haojun Liao 已提交
1511
      STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
1512

H
Haojun Liao 已提交
1513
      tRowMergerInit(&merge, pRow, pSchema);
1514
      doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1515 1516 1517

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1518
        doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
1519 1520 1521 1522 1523 1524 1525 1526 1527
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1528
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1529 1530
      return TSDB_CODE_SUCCESS;
    } else {
1531
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1532 1533 1534 1535

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1536
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
H
Haojun Liao 已提交
1537
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1538 1539 1540
        if (freeTSRow) {
          taosMemoryFree(pTSRow);
        }
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1552
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1553
        taosMemoryFree(pTSRow);
1554 1555 1556 1557 1558
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1559
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
1560 1561 1562 1563 1564

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
H
Haojun Liao 已提交
1565
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
1566 1567

        taosMemoryFree(pTSRow);
1568 1569 1570 1571 1572 1573
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1574
  return -1;
1575 1576
}

dengyihao's avatar
dengyihao 已提交
1577 1578
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1579 1580 1581 1582 1583 1584 1585 1586
  // it is an multi-table data block
  if (pBlockData->aUid != NULL) {
    uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
    if (uid != pBlockScanInfo->uid) {  // move to next row
      return false;
    }
  }

1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1598
  TSDBKEY k = {.ts = ts, .version = ver};
1599
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1600 1601 1602
    return false;
  }

1603 1604 1605
  return true;
}

1606
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1607

1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, SVersionRange* pVerRange,
    int16_t startPos) {
  pLastBlockReader->uid = uid;
  pLastBlockReader->window = *pWin;
  pLastBlockReader->verRange = *pVerRange;
  pLastBlockReader->rowIndex = startPos;
}

static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
  if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
    return false;
  }

  pLastBlockReader->rowIndex += 1;

  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
  for(int32_t i = pLastBlockReader->rowIndex; i < pBlockData->nRow; ++i) {
    if (pBlockData->aUid[i] != pLastBlockReader->uid) {
      continue;
    }

    if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) {
      continue;
    }

    if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) {
      continue;
    }

    // no data any more
    if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
      return false;
    }

    if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
      return false;
    }

    pLastBlockReader->rowIndex = i;
    return true;
  }

  pLastBlockReader->rowIndex = pBlockData->nRow;
  return false;
}

static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
  return pLastBlockReader->rowIndex;
}

static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) {
  pLastBlockReader->rowIndex = state;
}

static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
  SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
  return pBlockData->aTSKEY[pLastBlockReader->rowIndex];
}

// todo handle desc order
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
  if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
    return false;
  }

  return true;
}

static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
1677 1678
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1679 1680 1681 1682 1683
  int64_t key = INT64_MIN;
  if (pBlockData->nRow > 0) {
    key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  }

1684 1685
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1686

1687
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1688
    return doMergeThreeLevelRows(pReader, pBlockScanInfo, pBlockData);
1689
  } else {
1690
    // imem + file
1691
    if (pBlockScanInfo->iiter.hasVal) {
H
Haojun Liao 已提交
1692
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key);
1693 1694
    }

1695
    // mem + file
1696
    if (pBlockScanInfo->iter.hasVal) {
H
Haojun Liao 已提交
1697
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1698
    }
1699

1700
    if (pBlockData->nRow > 0) {
1701
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1702

1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758
      // row in last file block
      int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
      if (ts < key) {  // save rows in last block
        SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
        int64_t     tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);

        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};

        TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);

        tRowMergerInit(&merge, &fRow1, pReader->pSchema);
        doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
        tRowMergerGetRow(&merge, &pTSRow);

        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
        return TSDB_CODE_SUCCESS;
      } else if (ts == key) {
        STSRow*    pTSRow = NULL;
        SRowMerger merge = {0};

        tRowMergerInit(&merge, &fRow, pReader->pSchema);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);

        tRowMergerGetRow(&merge, &pTSRow);
        doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

        taosMemoryFree(pTSRow);
        tRowMergerClear(&merge);
        return TSDB_CODE_SUCCESS;
      } else {  // ts > key, asc; todo handle desc
        // imem & mem are all empty, only file exist
        if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
          return TSDB_CODE_SUCCESS;
        } else {
          STSRow*    pTSRow = NULL;
          SRowMerger merge = {0};

          tRowMergerInit(&merge, &fRow, pReader->pSchema);
          doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
          tRowMergerGetRow(&merge, &pTSRow);
          doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);

          taosMemoryFree(pTSRow);
          tRowMergerClear(&merge);
          return TSDB_CODE_SUCCESS;
        }
      }
    } else {  // only last block exists
      SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
      int64_t     tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);

1759 1760
      STSRow*    pTSRow = NULL;
      SRowMerger merge = {0};
H
Haojun Liao 已提交
1761

1762 1763
      TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);

1764
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1765
      doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
1766
      tRowMergerGetRow(&merge, &pTSRow);
1767

H
Haojun Liao 已提交
1768
      doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
1769

1770 1771 1772 1773
      taosMemoryFree(pTSRow);
      tRowMergerClear(&merge);
      return TSDB_CODE_SUCCESS;
    }
1774 1775 1776
  }
}

1777
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
1778 1779
  SSDataBlock* pResBlock = pReader->pResBlock;

1780
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794

  STableBlockScanInfo* pBlockScanInfo = NULL;
  if (pBlockInfo != NULL) {
    pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
  } else {
    pBlockScanInfo = pReader->status.pTableIter;
  }

  SLastBlockReader*   pLastBlockReader = pReader->status.fileIter.pLastBlockReader;

  int16_t startIndex = pBlockInfo != NULL? pBlockScanInfo->indexInBlockL:-1;
  initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, startIndex);

  bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
1795

1796
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1797 1798
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1799

1800 1801
  int64_t st = taosGetTimestampUs();

1802
  while (1) {
1803 1804
    // todo check the validate of row in file block
    {
1805
      if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
1806 1807
        pDumpInfo->rowIndex += step;

1808
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1809 1810 1811 1812 1813 1814 1815
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
1816 1817 1818 1819

      if (!hasDataInLastBlock(pLastBlockReader)) {
        break;
      }
1820 1821
    }

1822
    buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
1823
    SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1824

1825
    // currently loaded file data block is consumed
1826
    if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0)) {
1827 1828 1829 1830 1831 1832
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
1833 1834 1835 1836
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
1837 1838
  blockDataUpdateTsWindow(pResBlock, 0);

1839
  setComposedBlockFlag(pReader, true);
1840
  int64_t et = taosGetTimestampUs();
1841

1842
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
1843
            " rows:%d, elapsed time:%.2f ms %s",
1844
            pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
1845
            pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
1846

1847 1848 1849 1850 1851
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

1852
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1853 1854 1855 1856
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

1857
  int32_t code = TSDB_CODE_SUCCESS;
1858 1859 1860 1861 1862 1863 1864 1865 1866

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
1867 1868

  STbData* d = NULL;
H
Hongze Cheng 已提交
1869
  if (pReader->pReadSnap->pMem != NULL) {
H
Hongze Cheng 已提交
1870
    d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
1871
    if (d != NULL) {
1872
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
1873
      if (code == TSDB_CODE_SUCCESS) {
1874
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
1875

H
Haojun Liao 已提交
1876
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1877 1878
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
1879
      } else {
1880 1881
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1882
        return code;
1883 1884
      }
    }
H
Haojun Liao 已提交
1885
  } else {
1886
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
1887 1888
  }

1889
  STbData* di = NULL;
H
Hongze Cheng 已提交
1890
  if (pReader->pReadSnap->pIMem != NULL) {
H
Hongze Cheng 已提交
1891
    di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
1892
    if (di != NULL) {
1893
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
1894
      if (code == TSDB_CODE_SUCCESS) {
1895
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
1896

H
Haojun Liao 已提交
1897
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1898
                  "-%" PRId64 " %s",
1899
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
1900
      } else {
1901 1902
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1903
        return code;
1904 1905
      }
    }
H
Haojun Liao 已提交
1906 1907
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
1908 1909
  }

1910 1911
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

1912
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
1913 1914 1915
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1916 1917
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
1918 1919 1920
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
1921

1922 1923 1924
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

1925 1926
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
1927
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
1928 1929
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
H
more  
Hongze Cheng 已提交
1930
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
1931
    if (code != TSDB_CODE_SUCCESS) {
1932 1933 1934 1935 1936
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
1937
      tsdbDelFReaderClose(&pDelFReader);
1938 1939 1940
      goto _err;
    }

H
Hongze Cheng 已提交
1941
    code = tsdbReadDelIdx(pDelFReader, aDelIdx);
1942 1943 1944
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
1945 1946
      goto _err;
    }
1947

1948 1949 1950
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
1951
    if (pIdx != NULL) {
H
Hongze Cheng 已提交
1952
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
1953 1954 1955 1956 1957 1958 1959
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
1960
    }
1961
  }
1962

1963 1964 1965 1966 1967 1968 1969
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
1970 1971
  }

1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
1986 1987
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
1988 1989
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
1990 1991
  return code;

1992 1993 1994
_err:
  taosArrayDestroy(pDelData);
  return code;
1995 1996
}

1997 1998 1999
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

2000
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2001 2002
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

2003 2004
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2005
  if (pRow != NULL) {
2006 2007 2008
    key = TSDBROW_KEY(pRow);
  }

2009
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2010
  if (pRow != NULL) {
2011 2012 2013 2014 2015 2016 2017 2018 2019
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

2020
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
H
Haojun Liao 已提交
2021
  SReaderStatus* pStatus = &pReader->status;
2022

2023
  size_t  numOfTables = taosHashGetSize(pReader->status.pTableMap);
2024
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2025 2026

  while (1) {
2027
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2028
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2029 2030 2031
      break;
    }

H
Haojun Liao 已提交
2032
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2033 2034
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2035
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2036 2037 2038
      return code;
    }

2039 2040
    SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
    code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
2041 2042 2043 2044 2045
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(pIndexList);
      return code;
    }

2046 2047 2048 2049
    if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
      SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));

      code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
H
Haojun Liao 已提交
2050
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2051
        taosArrayDestroy(pIndexList);
2052
        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2053 2054 2055
        return code;
      }

2056 2057
      if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
        ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
2058 2059 2060 2061
        taosArrayClear(pLastBlocks);
        taosArrayAddAll(pLastBlocks, pQLastBlock);

        taosArrayDestroy(pQLastBlock);
H
Haojun Liao 已提交
2062 2063 2064
        break;
      }
    }
2065

H
Haojun Liao 已提交
2066 2067 2068
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2069
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2070 2071 2072
  return TSDB_CODE_SUCCESS;
}

2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64_t uid, STsdbReader* pReader) {
  SArray*  pBlocks = pLastBlockReader->pBlockL;
  SBlockL* pBlock = NULL;

  pLastBlockReader->currentBlockIndex = -1;

  // find the correct SBlockL
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
    SBlockL* p = taosArrayGet(pBlocks, i);
    if (p->minUid <= uid && p->maxUid >= uid) {
      pLastBlockReader->currentBlockIndex = i;
      pBlock = p;
      break;
    }
  }

  if (pLastBlockReader->currentBlockIndex == -1) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = tBlockDataCreate(&pLastBlockReader->lastBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
    //todo add log
    return code;
  }

  code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    //      tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
    //                    ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
    //                pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow,
    //                pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr);
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
  SReaderStatus*    pStatus = &pReader->status;
  SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;

  while(1) {
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
    } else { // let's try next table
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
      if (pStatus->pTableIter == NULL) {
        return TSDB_CODE_SUCCESS;
      }
    }

    // find the last block that contain the specified block uid
    return doLoadRelatedLastBlock(pLastBlockReader, pStatus->pTableIter->uid, pReader);

    //todo check for all empty table
  }
}

2139 2140 2141
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

2142
  SReaderStatus*  pStatus = &pReader->status;
2143 2144
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

2145 2146
  SBlock* pBlock = NULL;
  TSDBKEY key = {0};
2147 2148 2149
  STableBlockScanInfo* pScanInfo = NULL;
  SFileDataBlockInfo*  pBlockInfo = getCurrentBlockInfo(pBlockIter);
  SLastBlockReader*    pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
2150

2151 2152
  if (pBlockInfo != NULL) {
    pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
2153 2154
    pBlock = getCurrentBlock(pBlockIter);
    key = getCurrentKeyInBuf(pBlockIter, pReader);
2155 2156 2157 2158 2159 2160 2161 2162 2163 2164

    code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
    if (code != TSDB_CODE_SUCCESS) {
      // todo handle error
    }

    initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, pScanInfo->indexInBlockL);
    bool hasData = nextRowInLastBlock(pLastBlockReader);
  } else {
    ASSERT(pBlockIter->numOfBlocks == 0);
2165
  }
2166

2167 2168 2169 2170 2171 2172 2173
  if (pBlockInfo == NULL || fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
    if (pBlockInfo != NULL) {
      tBlockDataReset(&pStatus->fileBlockData);
      code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
      if (code != TSDB_CODE_SUCCESS) {
        //todo
      }
2174

2175 2176 2177 2178
      code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2179 2180 2181
    }

    // build composed data block
2182
    code = buildComposedDataBlock(pReader);
2183 2184
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2185
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2186
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2187
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2188
  } else {  // whole block is required, return it directly
2189
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
2190 2191 2192
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
2193
    setComposedBlockFlag(pReader, false);
2194
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
2195 2196 2197 2198 2199
  }

  return code;
}

H
Haojun Liao 已提交
2200
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2201 2202
  SReaderStatus* pStatus = &pReader->status;

2203
  while (1) {
2204 2205 2206
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2207
        return TSDB_CODE_SUCCESS;
2208 2209 2210 2211
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2212
    initMemDataIterator(pBlockScanInfo, pReader);
2213

2214
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2215
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2216 2217 2218 2219
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2220
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2221
      return TSDB_CODE_SUCCESS;
2222 2223 2224 2225 2226
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2227
      return TSDB_CODE_SUCCESS;
2228 2229 2230 2231
    }
  }
}

2232
// set the correct start position in case of the first/last file block, according to the query time window
2233
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2234
  SBlock* pBlock = getCurrentBlock(pBlockIter);
2235

2236 2237 2238
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2239 2240 2241

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2242
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2243 2244
}

2245
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2246 2247
  SBlockNumber num = {0};

2248
  int32_t code = moveToNextFile(pReader, &num);
2249 2250 2251 2252 2253
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
2254
  if (num.numOfBlocks + num.numOfLastBlocks == 0) {
2255 2256 2257 2258 2259
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
2260 2261 2262 2263 2264
  if (num.numOfBlocks > 0) {
    code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
  } else {
    pBlockIter->numOfBlocks = 0;
  }
2265 2266

  // set the correct start position according to the query time window
2267
  initBlockDumpInfo(pReader, pBlockIter);
2268 2269 2270
  return code;
}

2271
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2272 2273
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2274 2275
}

2276
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2277
  int32_t code = TSDB_CODE_SUCCESS;
2278 2279
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2280 2281
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313
  if (pBlockIter->numOfBlocks == 0) {
    _begin:
    code = doLoadLastBlockSequentially(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // all data blocks are check in last file, now let's try the next file
    if (pReader->status.pTableIter == NULL) {
      code = initForFirstBlockInFile(pReader, pBlockIter);

      // error happens or all the data files are completely checked
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        return code;
      }

      // this file does not have blocks, let's start check the last block file
      if (pBlockIter->numOfBlocks == 0) {
        goto _begin;
      }
    }

    code = doBuildDataBlock(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }

2314
  while (1) {
2315 2316
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2317
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2318
      code = buildComposedDataBlock(pReader);
2319 2320 2321 2322 2323 2324 2325
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
2326 2327 2328 2329 2330 2331 2332
        } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {  // data blocks in current file are exhausted, let's try the next file now
          // todo dump all data in last block if exists.
          pBlockIter->numOfBlocks = 0;
          taosArrayClear(pBlockIter->blockList);
          tBlockDataReset(&pReader->status.fileBlockData);
          goto _begin;
        } else {
2333 2334 2335 2336 2337 2338
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2339 2340 2341 2342 2343

          // this file does not have blocks, let's start check the last block file
          if (pBlockIter->numOfBlocks == 0) {
            goto _begin;
          }
2344
        }
H
Haojun Liao 已提交
2345
      }
2346 2347

      code = doBuildDataBlock(pReader);
2348 2349
    }

2350 2351 2352 2353 2354 2355 2356 2357
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2358
}
H
refact  
Hongze Cheng 已提交
2359

2360 2361
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2362
  if (VND_IS_RSMA(pVnode)) {
2363
    int8_t  level = 0;
2364 2365
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2366
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2380
    const char* str = (idStr != NULL) ? idStr : "";
2381 2382

    if (level == TSDB_RETENTION_L0) {
2383
      *pLevel = TSDB_RETENTION_L0;
C
Cary Xu 已提交
2384
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
2385 2386
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2387
      *pLevel = TSDB_RETENTION_L1;
C
Cary Xu 已提交
2388
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
2389 2390
      return VND_RSMA1(pVnode);
    } else {
2391
      *pLevel = TSDB_RETENTION_L2;
C
Cary Xu 已提交
2392
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
2393 2394 2395 2396 2397 2398 2399
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2400
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2401
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2402 2403

  int64_t endVer = 0;
L
Liu Jicong 已提交
2404 2405
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2406 2407
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2408
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2409 2410
  }

H
Haojun Liao 已提交
2411
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2412 2413
}

H
Hongze Cheng 已提交
2414 2415 2416 2417
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
2418

H
Hongze Cheng 已提交
2419 2420
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
2421

H
Hongze Cheng 已提交
2422 2423
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
2424
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
2425

H
Hongze Cheng 已提交
2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
2440
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
2441 2442 2443 2444 2445 2446
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

2447
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2448 2449 2450 2451
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2452 2453 2454
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2455

2456 2457 2458 2459 2460 2461
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2462
        return false;
2463 2464 2465
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2466 2467
      }
    } else {
2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2498 2499
    }
  } else {
2500 2501
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2502

2503 2504 2505 2506 2507 2508 2509
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2510
    } else {
2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2538 2539 2540 2541 2542
      }

      return false;
    }
  }
2543 2544

  return false;
2545 2546 2547 2548
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2549 2550
    return NULL;
  }
H
Hongze Cheng 已提交
2551

2552
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2553
  TSDBKEY  key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
2554
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2555
    pIter->hasVal = false;
H
Haojun Liao 已提交
2556 2557
    return NULL;
  }
H
Hongze Cheng 已提交
2558

2559
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2560
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2561
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2562 2563
    return pRow;
  }
H
Hongze Cheng 已提交
2564

2565
  while (1) {
2566 2567
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2568 2569
      return NULL;
    }
H
Hongze Cheng 已提交
2570

2571
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2572

H
Haojun Liao 已提交
2573
    key = TSDBROW_KEY(pRow);
2574
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2575
      pIter->hasVal = false;
H
Haojun Liao 已提交
2576 2577
      return NULL;
    }
H
Hongze Cheng 已提交
2578

dengyihao's avatar
dengyihao 已提交
2579
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2580
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2581 2582 2583 2584
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2585

2586 2587
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                         STsdbReader* pReader) {
H
Haojun Liao 已提交
2588
  while (1) {
2589 2590
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2591 2592
      break;
    }
H
Hongze Cheng 已提交
2593

2594
    // data exists but not valid
2595
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2596 2597 2598 2599 2600
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2601
    TSDBKEY k = TSDBROW_KEY(pRow);
2602
    if (k.ts != ts) {
H
Haojun Liao 已提交
2603 2604 2605
      break;
    }

H
Haojun Liao 已提交
2606
    STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
2607
    tRowMergerAdd(pMerger, pRow, pTSchema);
H
Haojun Liao 已提交
2608 2609 2610 2611 2612
  }

  return TSDB_CODE_SUCCESS;
}

2613
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2614
                                          SVersionRange* pVerRange, int32_t step) {
2615 2616
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2617
      rowIndex += step;
2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2635 2636
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2637
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2638
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2639

2640
  *state = CHECK_FILEBLOCK_QUIT;
2641
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2642 2643 2644

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2645
  if (pNeighborBlock == NULL) {  // do nothing
2646 2647 2648 2649
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2650 2651
  taosMemoryFree(pNeighborBlock);

2652
  if (overlap) {  // load next block
2653
    SReaderStatus*  pStatus = &pReader->status;
2654 2655
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2656
    // 1. find the next neighbor block in the scan block list
2657
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2658
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2659

2660
    // 2. remove it from the scan block list
2661
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2662

2663
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2664
    tBlockDataReset(&pStatus->fileBlockData);
2665
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
2666 2667 2668 2669
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2670
    // 4. check the data values
2671 2672 2673 2674
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2675
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2676 2677 2678 2679 2680 2681 2682
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2683 2684
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2685 2686
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2687
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2688
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2689
  int32_t step = asc ? 1 : -1;
2690

2691
  pDumpInfo->rowIndex += step;
2692
  if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
2693 2694 2695
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2696

2697 2698 2699 2700
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2701

2702
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2703
      SBlock*             pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2704 2705 2706
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2707
      }
2708
    }
H
Haojun Liao 已提交
2709
  }
2710

H
Haojun Liao 已提交
2711 2712 2713
  return TSDB_CODE_SUCCESS;
}

2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728
// todo support desc order
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) {
  while(nextRowInLastBlock(pLastBlockReader)) {
    int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
    if (next1 == ts) {
      TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, pLastBlockReader->rowIndex);
      tRowMerge(pMerger, &fRow1);
    } else {
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2729
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
2730
                      STsdbReader* pReader, bool* freeTSRow) {
H
Haojun Liao 已提交
2731
  TSDBROW* pNextRow = NULL;
2732
  TSDBROW  current = *pRow;
2733

2734 2735
  {  // if the timestamp of the next valid row has a different ts, return current row directly
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
2736

2737 2738 2739 2740 2741
    if (!pIter->hasVal) {
      *pTSRow = current.pTSRow;
      *freeTSRow = false;
      return;
    } else {  // has next point in mem/imem
H
Haojun Liao 已提交
2742
      pNextRow = getValidRow(pIter, pDelList, pReader);
2743 2744 2745 2746 2747 2748
      if (pNextRow == NULL) {
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }

H
Haojun Liao 已提交
2749
      if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
2750 2751 2752 2753
        *pTSRow = current.pTSRow;
        *freeTSRow = false;
        return;
      }
2754
    }
2755 2756
  }

2757 2758
  SRowMerger merge = {0};

2759
  // get the correct schema for data in memory
H
Haojun Liao 已提交
2760
  STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
H
Haojun Liao 已提交
2761

2762 2763
  if (pReader->pSchema == NULL) {
    pReader->pSchema = pTSchema;
2764
  }
H
Haojun Liao 已提交
2765

H
Haojun Liao 已提交
2766 2767 2768 2769 2770 2771
  tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);

  STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
  tRowMergerAdd(&merge, pNextRow, pTSchema1);

  doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
2772
  tRowMergerGetRow(&merge, pTSRow);
2773
  tRowMergerClear(&merge);
M
Minglei Jin 已提交
2774

2775
  *freeTSRow = true;
2776 2777
}

2778 2779
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2780 2781
  SRowMerger merge = {0};

2782 2783 2784
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2785
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
H
Haojun Liao 已提交
2786
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2787

H
Haojun Liao 已提交
2788
    tRowMergerInit(&merge, piRow, pSchema);
2789
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2790

2791
    tRowMerge(&merge, pRow);
2792
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2793
  } else {
H
Haojun Liao 已提交
2794
    STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
2795

H
Haojun Liao 已提交
2796
    tRowMergerInit(&merge, pRow, pSchema);
2797
    doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2798 2799

    tRowMerge(&merge, piRow);
2800
    doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2801
  }
2802 2803 2804 2805

  tRowMergerGetRow(&merge, pTSRow);
}

2806 2807
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
                            bool* freeTSRow) {
2808 2809
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2810
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2811

2812 2813
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2814
  if (pBlockScanInfo->iter.hasVal) {
2815 2816 2817 2818 2819 2820
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2821
  if (pBlockScanInfo->iiter.hasVal) {
2822 2823 2824 2825 2826 2827
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2828
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2829
    TSDBKEY k = TSDBROW_KEY(pRow);
2830
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2831

2832
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2833
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
2834
    } else if (k.ts < ik.ts) {
2835
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
2836 2837
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
2838
      *freeTSRow = true;
H
Haojun Liao 已提交
2839
    }
2840 2841

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2842 2843
  }

2844
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
2845
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
2846 2847 2848
    return TSDB_CODE_SUCCESS;
  }

2849
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
2850
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow);
H
Haojun Liao 已提交
2851 2852 2853 2854 2855 2856
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2857
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
2858 2859 2860
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

2861
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2862
  STSchema*           pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
2863

2864
  SColVal colVal = {0};
2865
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2866

2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
H
Haojun Liao 已提交
2878
      tTSRowGetVal(pTSRow, pSchema, j, &colVal);
2879 2880 2881 2882 2883 2884 2885 2886
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2887
    }
2888 2889
  }

2890
  // set null value since current column does not exist in the "pSchema"
2891
  while (i < numOfCols) {
2892 2893 2894 2895 2896
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

2897 2898 2899 2900
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2901
int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
2902 2903 2904 2905 2906 2907 2908 2909
  int32_t i = 0, j = 0;
  int32_t outputRowIndex = pResBlock->info.rows;

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
2910
    i += 1;
2911 2912 2913 2914 2915 2916
  }

  SColVal cv = {0};
  int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
  int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);

2917
  while (i < numOfOutputCols && j < numOfInputCols) {
2918
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
2919
    SColData*        pData = tBlockDataGetColDataByIdx(pBlockData, j);
2920 2921

    if (pData->cid == pCol->info.colId) {
2922 2923
      tColDataGetValue(pData, rowIndex, &cv);
      doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
      j += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNULL(pCol, outputRowIndex);
    }

    i += 1;
  }

  while (i < numOfOutputCols) {
    SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
2934
    colDataAppendNULL(pCol, outputRowIndex);
2935 2936 2937 2938 2939 2940 2941
    i += 1;
  }

  pResBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2942 2943
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2944 2945 2946 2947
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
2948
    bool    freeTSRow = false;
2949
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
2950 2951
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
2952 2953
    }

H
Haojun Liao 已提交
2954
    doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
2955 2956 2957
    if (freeTSRow) {
      taosMemoryFree(pTSRow);
    }
H
Haojun Liao 已提交
2958 2959

    // no data in buffer, return immediately
2960
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2961 2962 2963
      break;
    }

2964
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2965 2966 2967 2968
      break;
    }
  } while (1);

2969
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2970 2971
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2972

2973
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2974
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2975 2976 2977 2978 2979
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2980 2981 2982
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2983 2984 2985 2986 2987 2988
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2989

dengyihao's avatar
dengyihao 已提交
2990 2991 2992 2993 2994 2995
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2996

H
Hongze Cheng 已提交
2997
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
2998

C
Cary Xu 已提交
2999 3000 3001 3002 3003 3004 3005 3006 3007 3008
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
3009
  if (!pCur) {
C
Cary Xu 已提交
3010 3011
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
3026
// ====================================== EXPOSED APIs ======================================
3027 3028
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
3029 3030
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3031 3032
    goto _err;
  }
H
Hongze Cheng 已提交
3033

3034
  // check for query time window
H
Haojun Liao 已提交
3035
  STsdbReader* pReader = *ppReader;
3036
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3037 3038 3039
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3040

3041 3042 3043
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
3044
    int32_t     order = pCond->order;
3045 3046 3047 3048 3049 3050 3051 3052 3053 3054
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

3055
    // here we only need one more row, so the capacity is set to be ONE.
3056 3057 3058 3059 3060 3061 3062 3063
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
3064
    } else {
3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

3081 3082
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3083 3084 3085
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3086

H
Haojun Liao 已提交
3087 3088 3089
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
3090

H
Hongze Cheng 已提交
3091
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
3092 3093 3094
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
3095

3096 3097
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
3098

3099
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
3100
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3101 3102 3103 3104 3105 3106 3107 3108 3109 3110

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
3111
  } else {
3112
    STsdbReader*    pPrevReader = pReader->innerReader[0];
3113 3114
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

3115 3116 3117 3118 3119
    code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

3120 3121
    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
                        pPrevReader->idStr);
3122
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
3123 3124 3125 3126 3127 3128 3129 3130 3131

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
3132 3133 3134
    }
  }

3135
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3136
  return code;
H
Hongze Cheng 已提交
3137 3138

_err:
S
Shengliang Guan 已提交
3139
  tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3140
  return code;
H
refact  
Hongze Cheng 已提交
3141 3142 3143
}

void tsdbReaderClose(STsdbReader* pReader) {
3144 3145
  if (pReader == NULL) {
    return;
3146
  }
H
refact  
Hongze Cheng 已提交
3147

3148
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
H
Hongze Cheng 已提交
3149
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
3150

3151 3152 3153 3154
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
3155
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
3156 3157 3158 3159 3160
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Hongze Cheng 已提交
3161
  tBlockDataDestroy(&pReader->status.fileBlockData, true);
3162 3163

  cleanupDataBlockIterator(&pReader->status.blockIter);
3164 3165

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
3166
  destroyBlockScanInfo(pReader->status.pTableMap);
3167
  blockDataDestroy(pReader->pResBlock);
3168

H
Haojun Liao 已提交
3169 3170 3171
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
3172

3173
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
3174

3175
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
3176 3177
            " SMA-time:%.2f ms, fileBlocks:%" PRId64
            ", fileBlocks-time:%.2f ms, "
3178
            "build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
3179
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
3180
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
3181
            numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
3182

3183 3184
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
3185 3186 3187
  if (pReader->pMemSchema != pReader->pSchema) {
    taosMemoryFree(pReader->pMemSchema);
  }
3188
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3189 3190
}

3191
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3192
  // cleanup the data that belongs to the previous data block
3193 3194
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
3195

3196
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3197

3198 3199 3200 3201 3202
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
3203

3204 3205 3206
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
3207
      buildBlockFromBufferSequentially(pReader);
3208
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3209
    }
3210 3211 3212
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3213
  }
3214

3215
  return false;
H
refact  
Hongze Cheng 已提交
3216 3217
}

3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3255 3256 3257 3258
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3259 3260
}

3261 3262
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3263
    if (pReader->step == EXTERNAL_ROWS_MAIN) {
3264
      setBlockInfo(pReader, pDataBlockInfo);
3265
    } else if (pReader->step == EXTERNAL_ROWS_PREV) {
3266 3267 3268 3269 3270 3271 3272 3273 3274
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

3275
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3276
  int32_t code = 0;
3277
  *allHave = false;
H
Hongze Cheng 已提交
3278

3279
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
3280 3281 3282 3283
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

3284
  // there is no statistics data for composed block
3285 3286 3287 3288
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3289

3290
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
3291

3292
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
3293
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
3294

3295 3296
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

3297
  if (tBlockHasSma(pBlock)) {
H
Hongze Cheng 已提交
3298
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
3299
    if (code != TSDB_CODE_SUCCESS) {
3300 3301
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
3302 3303
      return code;
    }
3304 3305 3306
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
3307
  }
H
Hongze Cheng 已提交
3308

3309
  *allHave = true;
H
Hongze Cheng 已提交
3310

3311 3312
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
3313

3314 3315
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3332 3333
      i += 1;
      j += 1;
3334 3335 3336 3337 3338 3339 3340
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

3341
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
3342
  pReader->cost.smaLoadTime += elapsed;
3343
  pReader->cost.smaData += 1;
3344 3345 3346

  *pBlockStatis = pSup->plist;

3347
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
3348 3349
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3350
  return code;
H
Hongze Cheng 已提交
3351 3352
}

3353
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
3354 3355 3356
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3357
    return pReader->pResBlock->pDataBlock;
3358
  }
3359

3360
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3361
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3362

H
Haojun Liao 已提交
3363
  tBlockDataReset(&pStatus->fileBlockData);
3364 3365 3366 3367 3368 3369
  int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
  if (code != TSDB_CODE_SUCCESS) {
    //todo
  }

  code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
3370
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3371
    tBlockDataDestroy(&pStatus->fileBlockData, 1);
H
Haojun Liao 已提交
3372

3373 3374
    terrno = code;
    return NULL;
3375
  }
3376 3377 3378

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3379 3380
}

3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
3393
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3394 3395 3396
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3397

L
Liu Jicong 已提交
3398
  pReader->order = pCond->order;
3399
  pReader->type = TIMEWINDOW_RANGE_CONTAINED;
3400
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3401
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
3402
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3403

3404
  // allocate buffer in order to load data blocks from file
3405
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3406 3407
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3408
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3409
  tsdbDataFReaderClose(&pReader->pFileReader);
3410

3411
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
3412 3413
  tsdbDataFReaderClose(&pReader->pFileReader);

H
Hongze Cheng 已提交
3414
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
3415
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
3416
  resetDataBlockScanInfo(pReader->status.pTableMap);
3417

3418
  int32_t         code = 0;
3419 3420
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

3421 3422 3423 3424 3425 3426
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
3427 3428
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
                numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3429 3430 3431
      return code;
    }
  }
H
Hongze Cheng 已提交
3432

dengyihao's avatar
dengyihao 已提交
3433 3434
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3435

3436
  return code;
H
Hongze Cheng 已提交
3437
}
H
Hongze Cheng 已提交
3438

3439 3440 3441
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3442

3443 3444 3445 3446
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3447

3448 3449
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3450

3451 3452 3453
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3454

3455
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3456

3457
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3458

3459 3460
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3461

3462 3463
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3464

3465 3466
  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Haojun Liao 已提交
3467
  }
H
Hongze Cheng 已提交
3468

3469
  pTableBlockInfo->numOfTables = numOfTables;
3470
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3471

3472 3473
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
3474
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
3475

3476 3477
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3478

3479 3480 3481
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3482

3483 3484 3485
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3486

3487 3488 3489
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3490

3491 3492
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3493 3494

      hasNext = blockIteratorNext(&pStatus->blockIter);
3495 3496 3497 3498 3499
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3500

3501 3502
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
      hasNext = (pBlockIter->numOfBlocks > 0);
3503
    }
H
refact  
Hongze Cheng 已提交
3504

H
Hongze Cheng 已提交
3505 3506
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3507
  }
H
Hongze Cheng 已提交
3508

H
refact  
Hongze Cheng 已提交
3509 3510
  return code;
}
H
Hongze Cheng 已提交
3511

H
refact  
Hongze Cheng 已提交
3512
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3513
  int64_t rows = 0;
H
Hongze Cheng 已提交
3514

3515 3516
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3517

3518 3519 3520 3521 3522
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
3523
      d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
3524 3525 3526 3527 3528 3529 3530
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
3531
      di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
3532 3533 3534 3535 3536 3537 3538 3539
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3540

H
refact  
Hongze Cheng 已提交
3541
  return rows;
H
Hongze Cheng 已提交
3542
}
D
dapan1121 已提交
3543

L
Liu Jicong 已提交
3544
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3557

D
dapan1121 已提交
3558
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3559
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3575

D
dapan1121 已提交
3576 3577
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3608
  // fs
H
Hongze Cheng 已提交
3609 3610 3611 3612 3613
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3614 3615 3616 3617 3618 3619 3620 3621

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

S
Shengliang Guan 已提交
3622
  tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3637
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3638
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3639
  }
H
Hongze Cheng 已提交
3640

S
Shengliang Guan 已提交
3641
  tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3642
}