tsdbRead.c 100.5 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
18

19 20 21 22 23 24
typedef enum {
  EXTERNAL_ROWS_PREV = 0x1,
  EXTERNAL_ROWS_MAIN = 0x2,
  EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;

25
typedef struct {
dengyihao's avatar
dengyihao 已提交
26
  STbDataIter* iter;
27 28 29 30
  int32_t      index;
  bool         hasVal;
} SIterInfo;

H
Haojun Liao 已提交
31
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
32 33
  uint64_t  uid;
  TSKEY     lastKey;
34
  SMapData  mapData;     // block info (compressed)
dengyihao's avatar
dengyihao 已提交
35 36 37 38 39 40
  SArray*   pBlockList;  // block data index list
  SIterInfo iter;        // mem buffer skip list iterator
  SIterInfo iiter;       // imem buffer skip list iterator
  SArray*   delSkyline;  // delete info for this table
  int32_t   fileDelIndex;
  bool      iterInit;  // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
41 42 43
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
44
  int64_t uid;
45
  int64_t offset;
H
Haojun Liao 已提交
46
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
47 48

typedef struct SBlockOrderSupporter {
49 50 51 52
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
53 54 55
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
56 57 58
  int64_t numOfBlocks;
  double  blockLoadTime;
  double  buildmemBlock;
59
  int64_t headFileLoad;
60 61 62
  double  headFileLoadTime;
  int64_t smaData;
  double  smaLoadTime;
H
Hongze Cheng 已提交
63 64 65
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
66
  SArray*          pColAgg;
67
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
68
  SColumnDataAgg** plist;
69 70
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
71 72
} SBlockLoadSuppInfo;

73
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
74 75 76 77
  int32_t numOfFiles;  // number of total files
  int32_t index;       // current accessed index in the list
  SArray* pFileList;   // data file list
  int32_t order;
78
} SFilesetIter;
H
Haojun Liao 已提交
79 80

typedef struct SFileDataBlockInfo {
81
  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
82
  uint64_t uid;
83
  int32_t tbBlockIdx;
H
Haojun Liao 已提交
84 85 86
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
dengyihao's avatar
dengyihao 已提交
87 88 89 90
  int32_t numOfBlocks;
  int32_t index;
  SArray* blockList;  // SArray<SFileDataBlockInfo>
  int32_t order;
91 92
  SBlock  block;      // current SBlock data
  SHashObj* pTableMap;
H
Haojun Liao 已提交
93 94 95
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
96 97 98 99
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
100 101
} SFileBlockDumpInfo;

H
Haojun Liao 已提交
102
typedef struct SVersionRange {
dengyihao's avatar
dengyihao 已提交
103 104
  uint64_t minVer;
  uint64_t maxVer;
H
Haojun Liao 已提交
105 106
} SVersionRange;

H
Haojun Liao 已提交
107
typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
108 109
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
110
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
111
  SFileBlockDumpInfo   fBlockDumpInfo;
112 113 114 115 116
  SDFileSet*           pCurrentFileset;  // current opened file set
  SBlockData           fileBlockData;
  SFilesetIter         fileIter;
  SDataBlockIter       blockIter;
  bool                 composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
117 118
} SReaderStatus;

H
Hongze Cheng 已提交
119
struct STsdbReader {
H
Haojun Liao 已提交
120 121 122 123 124 125 126
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
127 128
  char*              idStr;   // query info handle, for debug purpose
  int32_t            type;    // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
129
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
130
  STsdbReadSnap*     pReadSnap;
131 132 133 134
  SIOCostSummary     cost;
  STSchema*          pSchema;
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
135

136 137
  int32_t            step;
  STsdbReader*       innerReader[2];
H
Hongze Cheng 已提交
138
};
H
Hongze Cheng 已提交
139

H
Haojun Liao 已提交
140
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
141 142
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
143
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
144 145
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
dengyihao's avatar
dengyihao 已提交
146 147
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                                 STsdbReader* pReader);
148 149 150
static int32_t  doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void     updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
151
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
152

dengyihao's avatar
dengyihao 已提交
153 154
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                             STsdbReader* pReader);
155 156
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
157 158 159 160
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
161
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Haojun Liao 已提交
162

163 164 165
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

166
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
167

168
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
169
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
170 171 172
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
173 174
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
175

H
Haojun Liao 已提交
176 177
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
178
    pSupInfo->colIds[i] = pCol->info.colId;
179 180 181 182

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
183
  }
H
Hongze Cheng 已提交
184

H
Haojun Liao 已提交
185 186
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
187

188
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
189
  // allocate buffer in order to load data blocks from file
190
  // todo use simple hash instead, optimize the memory consumption
191 192 193
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
194 195 196
    return NULL;
  }

197 198 199 200 201
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
202 203
      }

204
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
205
    } else {
206
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
207
    }
wmmhello's avatar
wmmhello 已提交
208

209 210 211
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
212 213
  }

214 215 216
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0,
      pTsdbReader->idStr);

217
  return pTableMap;
H
Hongze Cheng 已提交
218
}
H
Hongze Cheng 已提交
219

220 221 222
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
223
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
224 225
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
226
    if (p->iter.iter != NULL) {
227
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
228 229
    }

230
    p->delSkyline = taosArrayDestroy(p->delSkyline);
231 232 233
  }
}

234 235 236 237 238 239 240 241
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
242
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
243 244 245
    }

    if (p->iiter.iter != NULL) {
246
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
247 248
    }

249 250
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
251
    tMapDataClear(&p->mapData);
252 253 254 255 256
  }

  taosHashCleanup(pTableMap);
}

257
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
258 259
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
260
}
H
Hongze Cheng 已提交
261

262 263 264
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
265
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
266

267
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
268
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
269

dengyihao's avatar
dengyihao 已提交
270
  STimeWindow win = *pWindow;
271 272 273 274 275 276
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
277

H
Haojun Liao 已提交
278
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
279 280 281 282 283 284
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
285 286 287
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
288 289 290 291
  }
}

// init file iterator
H
Hongze Cheng 已提交
292 293
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
  size_t numOfFileset = taosArrayGetSize(aDFileSet);
294

295 296
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
H
Hongze Cheng 已提交
297
  pIter->pFileList = aDFileSet;
298
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
299

H
Haojun Liao 已提交
300
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
301 302 303
  return TSDB_CODE_SUCCESS;
}

304
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
305 306
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
307 308 309
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
310 311 312 313 314
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
315

316
  while (1) {
H
Haojun Liao 已提交
317 318 319
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
320

321
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
322

323 324 325 326
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
327

328 329
    pReader->cost.headFileLoad += 1;

330 331 332 333 334 335 336 337 338 339 340 341
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
342 343 344
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
345 346
      continue;
    }
C
Cary Xu 已提交
347

348
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
349
              pReader->window.ekey, pReader->idStr);
350 351
    return true;
  }
352

353
_err:
H
Haojun Liao 已提交
354 355 356
  return false;
}

357
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
358 359
  pIter->order = order;
  pIter->index = -1;
H
Haojun Liao 已提交
360
  pIter->numOfBlocks = -1;
361 362 363 364 365
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
366
  pIter->pTableMap = pTableMap;
367 368
}

L
Liu Jicong 已提交
369
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
370

H
Haojun Liao 已提交
371
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
372 373
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
374 375
}

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

399
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) {
H
Haojun Liao 已提交
400
  int32_t      code = 0;
401
  int8_t       level = 0;
H
Haojun Liao 已提交
402
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
403 404
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
405
    goto _end;
H
Hongze Cheng 已提交
406 407
  }

H
Haojun Liao 已提交
408
  initReaderStatus(&pReader->status);
409

L
Liu Jicong 已提交
410
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
411 412
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
413
  pReader->capacity = capacity;
dengyihao's avatar
dengyihao 已提交
414 415
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
416
  pReader->type = pCond->type;
417
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
418

419
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
420

421
  limitOutputBufferSize(pCond, &pReader->capacity);
422

423 424
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
425
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
426
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
427
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
428 429 430
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
431

432 433
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

H
Haojun Liao 已提交
434 435 436 437 438 439
  code = tBlockDataInit(&pReader->status.fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    goto _end;
  }

440 441 442 443
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
444
  }
H
Hongze Cheng 已提交
445

446 447
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
448 449
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
450

H
Haojun Liao 已提交
451 452
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
453 454 455
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
489
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516 517 518
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
521

H
Hongze Cheng 已提交
522 523 524 525 526 527 528 529 530 531 532
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
533

H
Hongze Cheng 已提交
534 535
//   return midSlot;
// }
H
Hongze Cheng 已提交
536

H
Haojun Liao 已提交
537
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
538
  SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
539

540
  int64_t st = taosGetTimestampUs();
541
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
H
Haojun Liao 已提交
542
  if (code != TSDB_CODE_SUCCESS) {
543
    goto _end;
H
Haojun Liao 已提交
544
  }
H
Hongze Cheng 已提交
545

546 547
  size_t num = taosArrayGetSize(aBlockIdx);
  if (num == 0) {
H
Hongze Cheng 已提交
548
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
549 550
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
551

552 553 554 555
  int64_t et1 = taosGetTimestampUs();

  SBlockIdx* pBlockIdx = NULL;
  for (int32_t i = 0; i < num; ++i) {
556
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
557

558
    // uid check
H
Hongze Cheng 已提交
559
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
560 561 562 563
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
564
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
565 566 567 568 569 570
    if (p == NULL) {
      continue;
    }

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
571
      pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t));
H
Haojun Liao 已提交
572 573
    }

H
Hongze Cheng 已提交
574
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
575
  }
H
Hongze Cheng 已提交
576

577
  int64_t et2 = taosGetTimestampUs();
578 579 580 581 582
  tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
            (int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);

  pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;

583
_end:
H
Hongze Cheng 已提交
584
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
585 586
  return code;
}
H
Hongze Cheng 已提交
587

588 589
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables,
                               int32_t* numOfBlocks) {
H
Haojun Liao 已提交
590 591
  size_t numOfTables = taosArrayGetSize(pIndexList);
  *numOfValidTables = 0;
H
Hongze Cheng 已提交
592

593 594 595
  int64_t st = taosGetTimestampUs();
  size_t size = 0;

596
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
597
  while (1) {
598 599 600 601 602
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }

603
    tMapDataClear(&px->mapData);
604 605 606
    taosArrayClear(px->pBlockList);
  }

dengyihao's avatar
dengyihao 已提交
607
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
608
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
609

610
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
H
Hongze Cheng 已提交
611

612 613
    tMapDataReset(&pScanInfo->mapData);
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL);
614

615 616
    size += pScanInfo->mapData.nData;
    for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
H
Haojun Liao 已提交
617
      SBlock block = {0};
618
      tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
619

620
      // 1. time range check
621
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
622 623
        continue;
      }
H
Hongze Cheng 已提交
624

625
      // 2. version range check
626 627 628
      if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
        continue;
      }
629

630
      void* p = taosArrayPush(pScanInfo->pBlockList, &j);
H
Haojun Liao 已提交
631
      if (p == NULL) {
632
        tMapDataClear(&pScanInfo->mapData);
H
Haojun Liao 已提交
633 634
        return TSDB_CODE_OUT_OF_MEMORY;
      }
635 636

      (*numOfBlocks) += 1;
H
Haojun Liao 已提交
637
    }
H
Hongze Cheng 已提交
638

H
Haojun Liao 已提交
639 640 641 642
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
      (*numOfValidTables) += 1;
    }
  }
H
Hongze Cheng 已提交
643

644
  double el = (taosGetTimestampUs() - st)/1000.0;
645
  tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
646 647 648 649
      numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);

  pReader->cost.numOfBlocks += (*numOfBlocks);
  pReader->cost.headFileLoadTime += el;
650

H
Haojun Liao 已提交
651 652
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
653

654 655
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
656
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
657

658
  pDumpInfo->allDumped = true;
659
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
660 661
}

662 663
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
664
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
665
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
666 667 668 669 670 671 672
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
673
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
674
  }
H
Haojun Liao 已提交
675 676
}

677 678 679 680 681 682 683 684 685
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pFBlockInfo;
}

static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
  return &pBlockIter->block;
}

686
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
687
  SReaderStatus*  pStatus = &pReader->status;
688
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
689

690
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
691
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
692
  SBlock*             pBlock  = getCurrentBlock(pBlockIter);
H
Haojun Liao 已提交
693
  SSDataBlock*        pResBlock = pReader->pResBlock;
694
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
695

H
Haojun Liao 已提交
696
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
697
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
698

699
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
700

H
Haojun Liao 已提交
701
  SColVal cv = {0};
702 703
  int32_t colIndex = 0;

704 705
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
706

707
  int32_t rowIndex = 0;
708 709
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

710 711 712 713 714 715 716 717
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

718
  int32_t          i = 0;
719 720
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
721
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
722 723 724 725 726
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

H
Hongze Cheng 已提交
727
  while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
728 729 730
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
731
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
732 733

    if (pData->cid == pColData->info.colId) {
734
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
735 736
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
737
      }
738
      colIndex += 1;
739
      ASSERT(rowIndex == remain);
740 741
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
742
    }
743 744 745 746

    i += 1;
  }

747
  while (i < numOfCols) {
748 749 750
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
751
  }
H
Haojun Liao 已提交
752

753
  pResBlock->info.rows = remain;
754
  pDumpInfo->rowIndex += step * remain;
755 756

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
757

758
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
759
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
760

761
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
762
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
763
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
764 765 766 767 768 769
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

  return TSDB_CODE_SUCCESS;
}

770 771
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter,
                                   STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
772 773 774
  int64_t st = taosGetTimestampUs();

  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
775 776
  SBlock* pBlock = getCurrentBlock(pBlockIter);

777 778 779 780 781 782
  SSDataBlock*        pResBlock = pReader->pResBlock;
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

783 784
  SBlockIdx blockIdx = {.suid = pReader->suid, .uid =  pBlockScanInfo->uid};
  int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols,
H
Hongze Cheng 已提交
785
                                 pBlockData, NULL, NULL);
786 787 788 789
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

790
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
791 792 793 794
  pReader->cost.blockLoadTime += elapsedTime;

  pDumpInfo->allDumped = false;
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
795
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
796
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
H
Haojun Liao 已提交
797
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
798

H
Haojun Liao 已提交
799
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
800 801

_error:
H
Haojun Liao 已提交
802 803 804 805 806
  tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, %s",
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pReader->idStr);
  return code;
H
Haojun Liao 已提交
807
}
H
Hongze Cheng 已提交
808

H
Haojun Liao 已提交
809 810 811
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
812

H
Haojun Liao 已提交
813 814 815 816
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
817

H
Haojun Liao 已提交
818 819
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
820

H
Haojun Liao 已提交
821 822
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
823

H
Haojun Liao 已提交
824
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
825 826
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
827

H
Haojun Liao 已提交
828 829 830 831
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
832

H
Haojun Liao 已提交
833 834
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
835

H
Haojun Liao 已提交
836
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
837
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
838
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
839

H
Haojun Liao 已提交
840
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
841

H
Haojun Liao 已提交
842 843
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
844

H
Haojun Liao 已提交
845 846 847 848 849 850 851
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
852

853
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
854
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
855

856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}

static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

  int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
  tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);

#if 0
  qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
871
}
H
Hongze Cheng 已提交
872

H
Haojun Liao 已提交
873
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
874
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
875

876
  pBlockIter->numOfBlocks = numOfBlocks;
877 878
  taosArrayClear(pBlockIter->blockList);

879 880
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
881

882
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
883

884
  SBlockOrderSupporter sup = {0};
885 886 887 888
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
889

890 891 892 893 894 895 896
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
897

898 899 900 901
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
902

903 904
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
905

906 907 908 909 910
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
911

912
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
913
    SBlock block = {0};
914 915
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
916 917 918 919

      int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
      tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);

920
      wrapper.uid = pTableScanInfo->uid;
921
      wrapper.offset = block.aSubBlock[0].offset;
H
Haojun Liao 已提交
922

923 924 925 926 927 928
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
929

930
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
931

932 933 934 935 936
  // since there is only one table qualified, blocks are not sorted
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
937
    }
938

939 940 941
    int64_t et = taosGetTimestampUs();
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt,
              (et - st)/1000.0, pReader->idStr);
H
Haojun Liao 已提交
942

943
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
944
    cleanupBlockOrderSupporter(&sup);
945
    doSetCurrentBlock(pBlockIter);
946
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
947
  }
H
Haojun Liao 已提交
948

949 950
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
951

952
  assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
953

954 955 956 957 958
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
959
  }
H
Haojun Liao 已提交
960

961 962 963 964
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
965

966 967
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
968

969 970 971 972
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
973

974 975
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
976
  }
H
Haojun Liao 已提交
977

978 979
  int64_t et = taosGetTimestampUs();
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr);
980 981
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
982

983
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
984 985
  doSetCurrentBlock(pBlockIter);

986
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
987
}
H
Hongze Cheng 已提交
988

H
Haojun Liao 已提交
989
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
990 991
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

992
  int32_t step = asc ? 1 : -1;
993
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
994 995 996
    return false;
  }

997
  pBlockIter->index += step;
998 999
  doSetCurrentBlock(pBlockIter);

1000 1001 1002
  return true;
}

1003 1004 1005
/**
 * This is an two rectangles overlap cases.
 */
1006
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1007 1008 1009 1010
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
         (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
         (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
H
Haojun Liao 已提交
1011
}
H
Hongze Cheng 已提交
1012

1013 1014
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1015 1016 1017
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1018 1019
  }

1020
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1021 1022 1023
    return NULL;
  }

1024
  int32_t step = asc ? 1 : -1;
1025
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
1026 1027 1028 1029 1030 1031

  SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
  int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);

  tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
  return pBlock;
1032 1033 1034 1035 1036
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1037
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1038 1039
  int32_t index = pBlockIter->index;

1040
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1053
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1054 1055 1056 1057 1058
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1059 1060 1061 1062 1063
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1064

1065 1066 1067
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1068

1069
  doSetCurrentBlock(pBlockIter);
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1080
}
H
Hongze Cheng 已提交
1081

1082
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1083
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1084

1085
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1086
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1087
}
H
Hongze Cheng 已提交
1088

H
Haojun Liao 已提交
1089
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
1090 1091
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
         (pBlock->minVersion <= pVerRange->maxVer);
H
Haojun Liao 已提交
1092 1093
}

1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
      if (p->version >= pBlock->minVersion) {
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
      if (p->version >= pBlock->minVersion) {
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1128
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1129 1130 1131 1132
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1133
  // ts is not overlap
1134
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1135
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1136 1137 1138 1139 1140
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1141 1142 1143 1144
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1145
    while (1) {
1146 1147 1148 1149 1150
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1151 1152 1153
      }
    }

1154 1155
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1156 1157
}

1158 1159 1160 1161
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1162
// 5. delete info should not overlap with current block data
1163 1164
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY key) {
1165 1166 1167
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1168
  // overlap with neighbor
1169 1170 1171
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
1172
    taosMemoryFree(pNeighbor);
1173 1174
  }

1175
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1176 1177
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1178

1179
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1180
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
H
Haojun Liao 已提交
1181 1182
}

1183
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1184
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1185 1186
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1187

1188 1189 1190
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1191
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1192

1193
  blockDataUpdateTsWindow(pBlock, 0);
1194
  pBlock->info.uid = pBlockScanInfo->uid;
1195

1196
  setComposedBlockFlag(pReader, true);
1197

1198 1199 1200 1201 1202 1203 1204
  double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
  tsdbDebug(
      "%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
      " - %" PRId64 " %s",
      pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);

  pReader->cost.buildmemBlock += elapsedTime;
H
Haojun Liao 已提交
1205 1206 1207
  return code;
}

1208
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
H
Haojun Liao 已提交
1209
                                     SIterInfo* pIter, int64_t key) {
1210
  SRowMerger          merge = {0};
H
Haojun Liao 已提交
1211
  STSRow*             pTSRow = NULL;
1212 1213 1214 1215
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBKEY k = TSDBROW_KEY(pRow);
1216
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1217
  SArray* pDelList = pBlockScanInfo->delSkyline;
1218

1219 1220 1221 1222 1223 1224 1225 1226
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else if (k.ts < key) {  // k.ts < key
1227
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1228 1229 1230
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1231 1232

      tRowMerge(&merge, pRow);
1233
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1234 1235

      tRowMergerGetRow(&merge, &pTSRow);
1236
    }
1237 1238
  } else {  // descending order scan
    if (key < k.ts) {
1239
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1240 1241
    } else if (k.ts < key) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1242

1243 1244 1245 1246 1247 1248
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else {  // descending order: mem rows -----> imem rows ------> file block
      updateSchema(pRow, pBlockScanInfo->uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1249
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1250 1251 1252 1253 1254 1255

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
    }
1256 1257
  }

1258
  tRowMergerClear(&merge);
1259
  doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
H
Haojun Liao 已提交
1260 1261

  taosMemoryFree(pTSRow);
1262 1263 1264
  return TSDB_CODE_SUCCESS;
}

1265 1266 1267 1268
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1269 1270
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
dengyihao's avatar
dengyihao 已提交
1271
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1272

1273 1274
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1275
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1276

1277
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
H
Haojun Liao 已提交
1278

1279
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1280

1281 1282 1283 1284
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

  if (ASCENDING_TRAVERSE(pReader->order)) {
1285 1286
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1287 1288 1289
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1290
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1291

1292 1293
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1294
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1295 1296
      }

1297 1298
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1299
        doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1300 1301 1302 1303
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1304
      return TSDB_CODE_SUCCESS;
1305
    } else {  // key > ik.ts || key > k.ts
1306 1307
      ASSERT(key != ik.ts);

1308
      // [3] ik.ts < key <= k.ts
1309
      // [4] ik.ts < k.ts <= key
1310
      if (ik.ts < k.ts) {
1311
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1312 1313 1314 1315
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1316 1317
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1318
      if (k.ts < ik.ts) {
1319
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
1320 1321 1322 1323
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1324
      // [7] k.ts == ik.ts < key
1325
      if (k.ts == ik.ts) {
1326 1327
        ASSERT(key > ik.ts && key > k.ts);

1328
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
1329 1330 1331 1332
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
1333 1334 1335 1336 1337 1338
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
      updateSchema(pRow, uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1339
      doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1340 1341 1342

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1343
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
      return TSDB_CODE_SUCCESS;
    } else {
1356
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1357 1358 1359 1360

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1361
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1380
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1393
  return -1;
1394 1395
}

dengyihao's avatar
dengyihao 已提交
1396 1397
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1409
  TSDBKEY k = {.ts = ts, .version = ver};
1410
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1411 1412 1413
    return false;
  }

1414 1415 1416
  return true;
}

1417
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1418

1419 1420 1421 1422 1423
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;

  int64_t  key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1424 1425
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1426

1427
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1428
    return doMergeThreeLevelRows(pReader, pBlockScanInfo);
1429
  } else {
1430
    // imem + file
1431
    if (pBlockScanInfo->iiter.hasVal) {
H
Haojun Liao 已提交
1432
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key);
1433 1434
    }

1435
    // mem + file
1436
    if (pBlockScanInfo->iter.hasVal) {
H
Haojun Liao 已提交
1437
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1438
    }
1439

1440
    // imem & mem are all empty, only file exist
1441
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1442

H
Haojun Liao 已提交
1443
    STSRow*    pTSRow = NULL;
1444
    SRowMerger merge = {0};
H
Haojun Liao 已提交
1445

1446 1447 1448 1449
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
    doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1450

H
Haojun Liao 已提交
1451 1452
    taosMemoryFree(pTSRow);
    tRowMergerClear(&merge);
1453
    return TSDB_CODE_SUCCESS;
1454 1455 1456
  }
}

1457
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
1458 1459
  SSDataBlock* pResBlock = pReader->pResBlock;

1460
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1461 1462
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1463

1464 1465
  int64_t st = taosGetTimestampUs();

1466
  while (1) {
1467 1468
    // todo check the validate of row in file block
    {
1469
      if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
1470 1471
        pDumpInfo->rowIndex += step;

1472
        SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1473 1474 1475 1476 1477 1478 1479 1480 1481
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
    }

1482
    buildComposedDataBlockImpl(pReader, pBlockScanInfo);
1483
    SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
1484

1485 1486 1487 1488 1489 1490 1491 1492
    // currently loaded file data block is consumed
    if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
1493 1494 1495 1496
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
1497 1498
  blockDataUpdateTsWindow(pResBlock, 0);

1499
  setComposedBlockFlag(pReader, true);
1500
  int64_t et = taosGetTimestampUs();
1501

1502
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader,
1503
            pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
1504
            (et - st)/1000.0, pReader->idStr);
1505

1506 1507 1508 1509 1510
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

1511
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1512 1513 1514 1515
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

1516
  int32_t code = TSDB_CODE_SUCCESS;
1517 1518 1519 1520 1521 1522 1523 1524 1525

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
1526 1527

  STbData* d = NULL;
H
Hongze Cheng 已提交
1528 1529
  if (pReader->pReadSnap->pMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
1530
    if (d != NULL) {
1531
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
1532
      if (code == TSDB_CODE_SUCCESS) {
1533
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
1534

H
Haojun Liao 已提交
1535
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1536 1537
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
1538
      } else {
1539 1540
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1541
        return code;
1542 1543
      }
    }
H
Haojun Liao 已提交
1544
  } else {
1545
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
1546 1547
  }

1548
  STbData* di = NULL;
H
Hongze Cheng 已提交
1549 1550
  if (pReader->pReadSnap->pIMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
1551
    if (di != NULL) {
1552
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
1553
      if (code == TSDB_CODE_SUCCESS) {
1554
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
1555

H
Haojun Liao 已提交
1556
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1557
                  "-%" PRId64 " %s",
1558
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
1559
      } else {
1560 1561
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1562
        return code;
1563 1564
      }
    }
H
Haojun Liao 已提交
1565 1566
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
1567 1568
  }

1569 1570
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

1571
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
1572 1573 1574
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1575 1576
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
1577 1578 1579
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
1580

1581 1582 1583
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

1584 1585
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
1586
  SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
1587 1588 1589
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
1590
    if (code != TSDB_CODE_SUCCESS) {
1591 1592 1593 1594 1595
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
1596
      tsdbDelFReaderClose(&pDelFReader);
1597 1598 1599
      goto _err;
    }

1600
    code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
1601 1602 1603
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
1604 1605
      goto _err;
    }
1606

1607 1608 1609
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
1610 1611
    if (pIdx != NULL) {
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
1612 1613 1614 1615 1616 1617 1618
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
1619
    }
1620
  }
1621

1622 1623 1624 1625 1626 1627 1628
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
1629 1630
  }

1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
1645 1646
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
1647 1648
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
1649 1650
  return code;

1651 1652 1653
_err:
  taosArrayDestroy(pDelData);
  return code;
1654 1655
}

1656 1657 1658
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

1659
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
1660 1661
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

1662 1663
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
1664
  if (pRow != NULL) {
1665 1666 1667
    key = TSDBROW_KEY(pRow);
  }

1668
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
1669
  if (pRow != NULL) {
1670 1671 1672 1673 1674 1675 1676 1677 1678
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

H
Haojun Liao 已提交
1679 1680
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
  SReaderStatus* pStatus = &pReader->status;
1681 1682 1683

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
  SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
H
Haojun Liao 已提交
1684 1685

  while (1) {
1686
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
1687
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
1688 1689 1690
      break;
    }

H
Haojun Liao 已提交
1691
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
1692 1693
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1694
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1695 1696 1697 1698 1699 1700 1701
      return code;
    }

    if (taosArrayGetSize(pIndexList) > 0) {
      uint32_t numOfValidTable = 0;
      code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1702
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
        return code;
      }

      if (numOfValidTable > 0) {
        break;
      }
    }
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
1713
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
1714 1715 1716
  return TSDB_CODE_SUCCESS;
}

1717 1718 1719
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

1720
  SReaderStatus*  pStatus = &pReader->status;
1721 1722
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

1723 1724
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
1725

1726
  SBlock* pBlock = getCurrentBlock(pBlockIter);
1727 1728 1729

  TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
  if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
H
Haojun Liao 已提交
1730 1731
    tBlockDataReset(&pStatus->fileBlockData);
    tBlockDataClearData(&pStatus->fileBlockData);
1732
    code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
1733 1734 1735 1736 1737
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // build composed data block
1738
    code = buildComposedDataBlock(pReader, pScanInfo);
1739 1740
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
1741
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
1742
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
1743
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
1744
  } else {  // whole block is required, return it directly
1745
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
1746 1747 1748
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
1749
    setComposedBlockFlag(pReader, false);
1750
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
1751 1752 1753 1754 1755
  }

  return code;
}

H
Haojun Liao 已提交
1756
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
1757 1758
  SReaderStatus* pStatus = &pReader->status;

1759
  while (1) {
1760 1761 1762
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
1763
        return TSDB_CODE_SUCCESS;
1764 1765 1766 1767
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
1768
    initMemDataIterator(pBlockScanInfo, pReader);
1769

1770
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
1771
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
1772 1773 1774 1775
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

1776
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
1777
      return TSDB_CODE_SUCCESS;
1778 1779 1780 1781 1782
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
1783
      return TSDB_CODE_SUCCESS;
1784 1785 1786 1787
    }
  }
}

1788
// set the correct start position in case of the first/last file block, according to the query time window
1789
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
1790
  SBlock* pBlock = getCurrentBlock(pBlockIter);
1791

1792 1793 1794
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
1795 1796 1797

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
1798
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
1799 1800
}

1801
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815
  int32_t numOfBlocks = 0;
  int32_t code = moveToNextFile(pReader, &numOfBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
  if (numOfBlocks == 0) {
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
  code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
1816 1817

  // set the correct start position according to the query time window
1818
  initBlockDumpInfo(pReader, pBlockIter);
1819 1820 1821
  return code;
}

1822
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
1823 1824
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
1825 1826
}

1827
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
1828
  int32_t code = TSDB_CODE_SUCCESS;
1829 1830
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

1831 1832
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

1833
  while (1) {
1834
    SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
1835 1836
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

1837 1838
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

1839
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854
      code = buildComposedDataBlock(pReader, pScanInfo);
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
        } else {  // data blocks in current file are exhausted, let's try the next file now
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
1855
        }
H
Haojun Liao 已提交
1856
      }
1857 1858 1859

      // current block is not loaded yet, or data in buffer may overlap with the file block.
      code = doBuildDataBlock(pReader);
1860 1861
    }

1862 1863 1864 1865 1866 1867 1868 1869
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
1870
}
H
refact  
Hongze Cheng 已提交
1871

1872 1873
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
1874
  if (VND_IS_RSMA(pVnode)) {
1875
    int8_t  level = 0;
1876 1877
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

1878
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

1892 1893
    int32_t     vgId = TD_VID(pVnode);
    const char* str = (idStr != NULL) ? idStr : "";
1894 1895

    if (level == TSDB_RETENTION_L0) {
1896
      *pLevel = TSDB_RETENTION_L0;
1897
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
1898 1899
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
1900
      *pLevel = TSDB_RETENTION_L1;
1901
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
1902 1903
      return VND_RSMA1(pVnode);
    } else {
1904
      *pLevel = TSDB_RETENTION_L2;
1905
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
1906 1907 1908 1909 1910 1911 1912
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
1913
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
1914
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
1915 1916

  int64_t endVer = 0;
L
Liu Jicong 已提交
1917 1918
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
1919 1920
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
1921
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
1922 1923
  }

H
Haojun Liao 已提交
1924
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
1925 1926
}

H
Hongze Cheng 已提交
1927 1928 1929 1930
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
1931

H
Hongze Cheng 已提交
1932 1933
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
1934

H
Hongze Cheng 已提交
1935 1936
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
1937
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
1938

H
Hongze Cheng 已提交
1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
1953
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
1954 1955 1956 1957 1958 1959
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

1960
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
1961 1962 1963 1964
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
1965 1966 1967
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
1968

1969 1970 1971 1972 1973 1974
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
1975
        return false;
1976 1977 1978
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
1979 1980
      }
    } else {
1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2011 2012
    }
  } else {
2013 2014
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2015

2016 2017 2018 2019 2020 2021 2022
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2023
    } else {
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2051 2052 2053 2054 2055
      }

      return false;
    }
  }
2056 2057

  return false;
2058 2059 2060 2061
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2062 2063
    return NULL;
  }
H
Hongze Cheng 已提交
2064

2065
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2066
  TSDBKEY  key = TSDBROW_KEY(pRow);
2067
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2068
    pIter->hasVal = false;
H
Haojun Liao 已提交
2069 2070
    return NULL;
  }
H
Hongze Cheng 已提交
2071

2072
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2073
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2074
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2075 2076
    return pRow;
  }
H
Hongze Cheng 已提交
2077

2078
  while (1) {
2079 2080
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2081 2082
      return NULL;
    }
H
Hongze Cheng 已提交
2083

2084
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2085

H
Haojun Liao 已提交
2086
    key = TSDBROW_KEY(pRow);
2087
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2088
      pIter->hasVal = false;
H
Haojun Liao 已提交
2089 2090
      return NULL;
    }
H
Hongze Cheng 已提交
2091

dengyihao's avatar
dengyihao 已提交
2092
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2093
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2094 2095 2096 2097
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2098

dengyihao's avatar
dengyihao 已提交
2099
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
H
Haojun Liao 已提交
2100
  while (1) {
2101 2102
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2103 2104
      break;
    }
H
Hongze Cheng 已提交
2105

2106
    // data exists but not valid
2107
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2108 2109 2110 2111 2112
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2113
    TSDBKEY k = TSDBROW_KEY(pRow);
2114
    if (k.ts != ts) {
H
Haojun Liao 已提交
2115 2116 2117 2118 2119 2120 2121 2122 2123
      break;
    }

    tRowMerge(pMerger, pRow);
  }

  return TSDB_CODE_SUCCESS;
}

2124
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2125
                                          SVersionRange* pVerRange, int32_t step) {
2126 2127
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2128
      rowIndex += step;
2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2146 2147
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2148
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2149
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2150

2151
  *state = CHECK_FILEBLOCK_QUIT;
2152
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2153 2154 2155

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2156
  if (pNeighborBlock == NULL) {  // do nothing
2157 2158 2159 2160
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
2161 2162
  taosMemoryFree(pNeighborBlock);

2163
  if (overlap) {  // load next block
2164
    SReaderStatus*  pStatus = &pReader->status;
2165 2166
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2167
    // 1. find the next neighbor block in the scan block list
2168
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2169
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2170

2171
    // 2. remove it from the scan block list
2172
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2173

2174
    // 3. load the neighbor block, and set it to be the currently accessed file data block
H
Haojun Liao 已提交
2175 2176
    tBlockDataReset(&pStatus->fileBlockData);
    tBlockDataClearData(&pStatus->fileBlockData);
2177 2178 2179 2180 2181
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2182
    // 4. check the data values
2183 2184 2185 2186
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
H
Haojun Liao 已提交
2187
    if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
2188 2189 2190 2191 2192 2193 2194
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2195 2196
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2197 2198
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2199
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2200
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2201
  int32_t step = asc ? 1 : -1;
2202

2203 2204 2205 2206 2207
  pDumpInfo->rowIndex += step;
  if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2208

2209 2210 2211 2212
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2213

2214
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
2215
      SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
2216 2217 2218
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2219
      }
2220
    }
H
Haojun Liao 已提交
2221
  }
2222

H
Haojun Liao 已提交
2223 2224 2225
  return TSDB_CODE_SUCCESS;
}

2226
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
2227 2228 2229
  int32_t sversion = TSDBROW_SVERSION(pRow);

  if (pReader->pSchema == NULL) {
M
Minglei Jin 已提交
2230
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2231 2232
  } else if (pReader->pSchema->version != sversion) {
    taosMemoryFreeClear(pReader->pSchema);
M
Minglei Jin 已提交
2233
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2234 2235 2236
  }
}

dengyihao's avatar
dengyihao 已提交
2237 2238
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                      STsdbReader* pReader) {
2239 2240 2241
  SRowMerger merge = {0};

  TSDBKEY k = TSDBROW_KEY(pRow);
2242
  updateSchema(pRow, uid, pReader);
H
Haojun Liao 已提交
2243

2244
  tRowMergerInit(&merge, pRow, pReader->pSchema);
2245
  doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
2246
  tRowMergerGetRow(&merge, pTSRow);
2247
  tRowMergerClear(&merge);
2248 2249
}

2250 2251
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2252 2253
  SRowMerger merge = {0};

2254 2255 2256
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2257 2258 2259 2260
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
    updateSchema(piRow, pBlockScanInfo->uid, pReader);

    tRowMergerInit(&merge, piRow, pReader->pSchema);
2261
    doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2262

2263
    tRowMerge(&merge, pRow);
2264
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2265 2266
  } else {
    updateSchema(pRow, pBlockScanInfo->uid, pReader);
2267

2268
    tRowMergerInit(&merge, pRow, pReader->pSchema);
2269
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2270 2271

    tRowMerge(&merge, piRow);
2272
    doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2273
  }
2274 2275 2276 2277

  tRowMergerGetRow(&merge, pTSRow);
}

2278 2279
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
                            int64_t endKey) {
2280 2281
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2282
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2283

2284 2285
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2286
  if (pBlockScanInfo->iter.hasVal) {
2287 2288 2289 2290 2291 2292
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2293
  if (pBlockScanInfo->iiter.hasVal) {
2294 2295 2296 2297 2298 2299
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2300
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2301
    TSDBKEY k = TSDBROW_KEY(pRow);
2302
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2303

2304
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2305
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
2306
    } else if (k.ts < ik.ts) {
2307
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
2308 2309
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
H
Haojun Liao 已提交
2310
    }
2311 2312

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2313 2314
  }

2315 2316
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2317 2318 2319
    return TSDB_CODE_SUCCESS;
  }

2320 2321
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2322 2323 2324 2325 2326 2327
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

2328 2329 2330 2331
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

2332
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2333
  STSchema*           pSchema = pReader->pSchema;
2334

2335
  SColVal colVal = {0};
2336
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2337

2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
      tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2358
    }
2359 2360
  }

2361
  // set null value since current column does not exist in the "pSchema"
2362
  while (i < numOfCols) {
2363 2364 2365 2366 2367
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

2368 2369 2370 2371
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2372 2373
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2374 2375 2376 2377
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
2378
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
2379 2380
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
2381 2382
    }

2383
    doAppendOneRow(pBlock, pReader, pTSRow);
2384
    taosMemoryFree(pTSRow);
H
Haojun Liao 已提交
2385 2386

    // no data in buffer, return immediately
2387
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2388 2389 2390
      break;
    }

2391
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2392 2393 2394 2395
      break;
    }
  } while (1);

2396
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2397 2398
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2399

2400
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2401
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2402 2403 2404 2405 2406
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2407 2408 2409
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2410 2411 2412 2413 2414 2415
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2416

dengyihao's avatar
dengyihao 已提交
2417 2418 2419 2420 2421 2422
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2423

C
Cary Xu 已提交
2424 2425 2426 2427 2428 2429 2430 2431 2432 2433
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
2434
  if (!pCur) {
C
Cary Xu 已提交
2435 2436
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
2451
// ====================================== EXPOSED APIs ======================================
2452 2453
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
2454 2455
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2456 2457
    goto _err;
  }
H
Hongze Cheng 已提交
2458

2459
  // check for query time window
H
Haojun Liao 已提交
2460
  STsdbReader* pReader = *ppReader;
2461
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2462 2463 2464
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2465

2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504
  if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
    // update the SQueryTableDataCond to create inner reader
    STimeWindow w = pCond->twindows;
    int32_t order = pCond->order;
    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.ekey = pCond->twindows.skey;
      pCond->twindows.skey = INT64_MIN;
      pCond->order = TSDB_ORDER_DESC;
    } else {
      pCond->twindows.skey = pCond->twindows.ekey;
      pCond->twindows.ekey = INT64_MAX;
      pCond->order = TSDB_ORDER_ASC;
    }

    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }

    if (order == TSDB_ORDER_ASC) {
      pCond->twindows.skey = w.ekey;
      pCond->twindows.ekey = INT64_MAX;
    } else  {
      pCond->twindows.skey = INT64_MIN;
      pCond->twindows.ekey = w.ekey;
    }
    code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
  }

  if (pCond->suid != 0) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

2505 2506
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
2507 2508 2509
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
2510

H
Haojun Liao 已提交
2511 2512 2513
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
2514

H
Hongze Cheng 已提交
2515
  code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap);
2516 2517 2518
  if (code != TSDB_CODE_SUCCESS) {
    goto _err;
  }
H
Hongze Cheng 已提交
2519

2520 2521
  if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
    SDataBlockIter* pBlockIter = &pReader->status.blockIter;
2522

2523
    initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
2524
    resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
2525 2526 2527 2528 2529 2530 2531 2532 2533 2534

    // no data in files, let's try buffer in memory
    if (pReader->status.fileIter.numOfFiles == 0) {
      pReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
2535
  } else {
2536 2537 2538 2539
    STsdbReader* pPrevReader = pReader->innerReader[0];
    SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;

    initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
2540
    resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
2541 2542 2543 2544 2545 2546 2547 2548 2549

    // no data in files, let's try buffer in memory
    if (pPrevReader->status.fileIter.numOfFiles == 0) {
      pPrevReader->status.loadFromFile = false;
    } else {
      code = initForFirstBlockInFile(pPrevReader, pBlockIter);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2550 2551 2552
    }
  }

2553
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
2554
  return code;
H
Hongze Cheng 已提交
2555 2556

_err:
2557
  tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
2558
  return code;
H
refact  
Hongze Cheng 已提交
2559 2560 2561
}

void tsdbReaderClose(STsdbReader* pReader) {
2562 2563
  if (pReader == NULL) {
    return;
2564
  }
H
refact  
Hongze Cheng 已提交
2565

2566 2567
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

H
Hongze Cheng 已提交
2568
  tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
H
Hongze Cheng 已提交
2569

2570 2571 2572 2573
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
2574
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
2575 2576 2577 2578 2579
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
2580
  tBlockDataClear(&pReader->status.fileBlockData, true);
2581 2582

  cleanupDataBlockIterator(&pReader->status.blockIter);
2583 2584

  size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
2585
  destroyBlockScanInfo(pReader->status.pTableMap);
2586
  blockDataDestroy(pReader->pResBlock);
2587

H
Haojun Liao 已提交
2588 2589 2590
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
2591

2592
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
2593

2594
  tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
2595 2596
            "fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
                                "size:%.2f Kb %s",
2597
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
2598 2599
            pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
            numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr);
H
refact  
Hongze Cheng 已提交
2600

2601 2602 2603
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
2604 2605
}

2606
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2607
  // cleanup the data that belongs to the previous data block
2608 2609
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
2610

2611
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
2612

2613 2614 2615 2616 2617
  if (pStatus->loadFromFile) {
    int32_t code = buildBlockFromFiles(pReader);
    if (code != TSDB_CODE_SUCCESS) {
      return false;
    }
2618

2619 2620 2621
    if (pBlock->info.rows > 0) {
      return true;
    } else {
H
Haojun Liao 已提交
2622
      buildBlockFromBufferSequentially(pReader);
2623
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2624
    }
2625 2626 2627
  } else {  // no data in files, let's try the buffer
    buildBlockFromBufferSequentially(pReader);
    return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2628
  }
2629

2630
  return false;
H
refact  
Hongze Cheng 已提交
2631 2632
}

2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669
bool tsdbNextDataBlock(STsdbReader* pReader) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return false;
  }

  if (pReader->innerReader[0] != NULL) {
    bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
    if (ret) {
      pReader->step = EXTERNAL_ROWS_PREV;
      return ret;
    }

    tsdbReaderClose(pReader->innerReader[0]);
    pReader->innerReader[0] = NULL;
  }

  pReader->step = EXTERNAL_ROWS_MAIN;
  bool ret = doTsdbNextDataBlock(pReader);
  if (ret) {
    return ret;
  }

  if (pReader->innerReader[1] != NULL) {
    bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
    if (ret1) {
      pReader->step = EXTERNAL_ROWS_NEXT;
      return ret1;
    }

    tsdbReaderClose(pReader->innerReader[1]);
    pReader->innerReader[1] = NULL;
  }

  return false;
}

static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
2670 2671 2672 2673
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
2674 2675
}

2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step ==  EXTERNAL_ROWS_MAIN) {
      setBlockInfo(pReader, pDataBlockInfo);
    } else if (pReader->step ==  EXTERNAL_ROWS_PREV) {
      setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
    } else {
      setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
    }
  } else {
    setBlockInfo(pReader, pDataBlockInfo);
  }
}

2690
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
2691
  int32_t code = 0;
2692
  *allHave = false;
H
Hongze Cheng 已提交
2693

2694 2695 2696 2697 2698
  if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }

2699
  // there is no statistics data for composed block
2700 2701 2702 2703
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2704

2705
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
H
Hongze Cheng 已提交
2706

2707
  SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
2708
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
2709

2710 2711
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

2712
  if (tBlockHasSma(pBlock)) {
2713
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
2714
    if (code != TSDB_CODE_SUCCESS) {
2715 2716
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
2717 2718
      return code;
    }
2719 2720 2721
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
2722
  }
H
Hongze Cheng 已提交
2723

2724
  *allHave = true;
H
Hongze Cheng 已提交
2725

2726 2727
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
2728

2729 2730
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
2747 2748
      i += 1;
      j += 1;
2749 2750 2751 2752 2753 2754 2755
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

2756
  double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
2757
  pReader->cost.smaLoadTime += elapsed;
2758
  pReader->cost.smaData += 1;
2759 2760 2761

  *pBlockStatis = pSup->plist;

2762
  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid,
2763 2764
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
2765
  return code;
H
Hongze Cheng 已提交
2766 2767
}

2768
static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
H
Haojun Liao 已提交
2769 2770 2771
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
2772
    return pReader->pResBlock->pDataBlock;
2773
  }
2774

2775
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
2776
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2777

H
Haojun Liao 已提交
2778 2779 2780
  tBlockDataReset(&pStatus->fileBlockData);
  tBlockDataClearData(&pStatus->fileBlockData);
  int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
2781
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
2782
    tBlockDataClear(&pStatus->fileBlockData, 1);
H
Haojun Liao 已提交
2783

2784 2785
    terrno = code;
    return NULL;
2786
  }
2787 2788 2789

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
2790 2791
}

2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
  if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
    if (pReader->step == EXTERNAL_ROWS_PREV) {
      return doRetrieveDataBlock(pReader->innerReader[0]);
    } else if (pReader->step == EXTERNAL_ROWS_NEXT) {
      return doRetrieveDataBlock(pReader->innerReader[1]);
    }
  }

  return doRetrieveDataBlock(pReader);
}

H
Haojun Liao 已提交
2804
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
2805 2806 2807
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2808

L
Liu Jicong 已提交
2809
  pReader->order = pCond->order;
2810
  pReader->type  = TIMEWINDOW_RANGE_CONTAINED;
2811
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
2812
  pReader->status.pTableIter = NULL;
H
Haojun Liao 已提交
2813
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
2814

2815
  // allocate buffer in order to load data blocks from file
2816
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
2817 2818
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

2819
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
2820
  tsdbDataFReaderClose(&pReader->pFileReader);
2821

2822
  int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
L
Liu Jicong 已提交
2823 2824
  tsdbDataFReaderClose(&pReader->pFileReader);

H
Hongze Cheng 已提交
2825
  initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
2826
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
2827
  resetDataBlockScanInfo(pReader->status.pTableMap);
2828 2829

  int32_t code = 0;
2830 2831
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2832 2833 2834 2835 2836 2837
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
2838 2839
      tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
                pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
2840 2841 2842
      return code;
    }
  }
H
Hongze Cheng 已提交
2843

dengyihao's avatar
dengyihao 已提交
2844 2845
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
2846

2847
  return code;
H
Hongze Cheng 已提交
2848
}
H
Hongze Cheng 已提交
2849

2850 2851 2852
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
2853

2854 2855 2856 2857
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
2858

2859 2860
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
2861

2862 2863 2864
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
2865

2866
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
2867

2868
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
2869

2870 2871
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
2872

2873 2874
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
2875 2876 2877 2878

  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
  }
H
Hongze Cheng 已提交
2879

2880
  pTableBlockInfo->numOfTables = numOfTables;
H
Haojun Liao 已提交
2881
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
2882

2883 2884
  while (true) {
    if (hasNext) {
H
Haojun Liao 已提交
2885
      SBlock* pBlock = getCurrentBlock(pBlockIter);
H
Hongze Cheng 已提交
2886

2887 2888
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
2889

2890 2891 2892
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
2893

2894 2895 2896
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
2897

2898 2899 2900
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
2901

2902 2903
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
2904 2905

      hasNext = blockIteratorNext(&pStatus->blockIter);
2906 2907 2908 2909 2910
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
2911

2912
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
2913
      hasNext = (pBlockIter->numOfBlocks > 0);
2914
    }
H
refact  
Hongze Cheng 已提交
2915

H
Hongze Cheng 已提交
2916 2917
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
2918
  }
H
Hongze Cheng 已提交
2919

H
refact  
Hongze Cheng 已提交
2920 2921
  return code;
}
H
Hongze Cheng 已提交
2922

H
refact  
Hongze Cheng 已提交
2923
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
2924
  int64_t rows = 0;
H
Hongze Cheng 已提交
2925

2926 2927
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
2928

2929 2930 2931 2932 2933
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
2934
      tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
2935 2936 2937 2938 2939 2940 2941
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
2942
      tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
2943 2944 2945 2946 2947 2948 2949 2950
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
2951

H
refact  
Hongze Cheng 已提交
2952
  return rows;
H
Hongze Cheng 已提交
2953
}
D
dapan1121 已提交
2954

L
Liu Jicong 已提交
2955
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
2968

D
dapan1121 已提交
2969
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
2970
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
2986

D
dapan1121 已提交
2987 2988
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018

int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
  int32_t code = 0;

  // alloc
  *ppSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
  if (*ppSnap == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // lock
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

  // take snapshot
  (*ppSnap)->pMem = pTsdb->mem;
  (*ppSnap)->pIMem = pTsdb->imem;

  if ((*ppSnap)->pMem) {
    tsdbRefMemTable((*ppSnap)->pMem);
  }

  if ((*ppSnap)->pIMem) {
    tsdbRefMemTable((*ppSnap)->pIMem);
  }

H
Hongze Cheng 已提交
3019
  // fs
H
Hongze Cheng 已提交
3020 3021 3022 3023 3024
  code = tsdbFSRef(pTsdb, &(*ppSnap)->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _exit;
  }
H
Hongze Cheng 已提交
3025 3026 3027 3028 3029 3030 3031 3032

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Hongze Cheng 已提交
3033
  tsdbTrace("vgId:%d take read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047
_exit:
  return code;
}

void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
  if (pSnap) {
    if (pSnap->pMem) {
      tsdbUnrefMemTable(pSnap->pMem);
    }

    if (pSnap->pIMem) {
      tsdbUnrefMemTable(pSnap->pIMem);
    }

H
Hongze Cheng 已提交
3048
    tsdbFSUnref(pTsdb, &pSnap->fs);
H
Hongze Cheng 已提交
3049
    taosMemoryFree(pSnap);
H
Hongze Cheng 已提交
3050
  }
H
Hongze Cheng 已提交
3051 3052

  tsdbTrace("vgId:%d untake read snapshot", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
3053
}