tsdbRead.c 109.7 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
18

19
typedef struct {
dengyihao's avatar
dengyihao 已提交
20
  STbDataIter* iter;
21 22 23 24
  int32_t      index;
  bool         hasVal;
} SIterInfo;

H
Haojun Liao 已提交
25
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32 33 34
  uint64_t  uid;
  TSKEY     lastKey;
  SBlockIdx blockIdx;
  SArray*   pBlockList;  // block data index list
  SIterInfo iter;        // mem buffer skip list iterator
  SIterInfo iiter;       // imem buffer skip list iterator
  SArray*   delSkyline;  // delete info for this table
  int32_t   fileDelIndex;
  bool      iterInit;  // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
35 36 37
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
38 39
  int64_t uid;
  SBlock* pBlock;
H
Haojun Liao 已提交
40
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
41 42

typedef struct SBlockOrderSupporter {
43 44 45 46
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
47 48 49
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
H
Haojun Liao 已提交
50
  int64_t blockLoadTime;
51
  int64_t smaLoadTime;
H
Haojun Liao 已提交
52
  int64_t checkForNextTime;
53 54
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Hongze Cheng 已提交
55 56 57
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
58
  SArray*          pColAgg;
59
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
60
  SColumnDataAgg** plist;
61 62
  int16_t*         colIds;    // column ids for loading file block data
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
63 64
} SBlockLoadSuppInfo;

65
typedef struct SFilesetIter {
H
Hongze Cheng 已提交
66 67 68 69
  int32_t numOfFiles;  // number of total files
  int32_t index;       // current accessed index in the list
  SArray* pFileList;   // data file list
  int32_t order;
70
} SFilesetIter;
H
Haojun Liao 已提交
71 72

typedef struct SFileDataBlockInfo {
dengyihao's avatar
dengyihao 已提交
73
  int32_t
L
Liu Jicong 已提交
74
      tbBlockIdx;  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
dengyihao's avatar
dengyihao 已提交
75
  uint64_t uid;
H
Haojun Liao 已提交
76 77 78
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
dengyihao's avatar
dengyihao 已提交
79 80 81 82
  int32_t numOfBlocks;
  int32_t index;
  SArray* blockList;  // SArray<SFileDataBlockInfo>
  int32_t order;
H
Haojun Liao 已提交
83 84 85
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
86 87 88 89
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
90 91
} SFileBlockDumpInfo;

H
Haojun Liao 已提交
92
typedef struct SVersionRange {
dengyihao's avatar
dengyihao 已提交
93 94
  uint64_t minVer;
  uint64_t maxVer;
H
Haojun Liao 已提交
95 96
} SVersionRange;

H
Haojun Liao 已提交
97
typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
98 99
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
100
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
101
  SFileBlockDumpInfo   fBlockDumpInfo;
102

dengyihao's avatar
dengyihao 已提交
103 104 105 106 107
  SDFileSet*     pCurrentFileset;  // current opened file set
  SBlockData     fileBlockData;
  SFilesetIter   fileIter;
  SDataBlockIter blockIter;
  bool           composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
108 109
} SReaderStatus;

H
Hongze Cheng 已提交
110
struct STsdbReader {
H
Haojun Liao 已提交
111 112 113 114 115 116 117
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
118 119
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
120
  SBlockLoadSuppInfo suppInfo;
H
Hongze Cheng 已提交
121 122
  SMemTable*         pMem;
  SMemTable*         pIMem;
123

L
Liu Jicong 已提交
124 125 126 127
  SIOCostSummary cost;
  STSchema*      pSchema;
  SDataFReader*  pFileReader;
  SVersionRange  verRange;
H
Hongze Cheng 已提交
128
};
H
Hongze Cheng 已提交
129

H
Haojun Liao 已提交
130
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
131 132
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
133
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
134 135
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
dengyihao's avatar
dengyihao 已提交
136 137
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                                 STsdbReader* pReader);
138 139 140
static int32_t  doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void     updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
141
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
142

dengyihao's avatar
dengyihao 已提交
143 144
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                             STsdbReader* pReader);
145 146
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
147 148 149 150
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
151
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Haojun Liao 已提交
152

153 154 155
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

156
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
157

158
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
159
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
160 161 162
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
163 164
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
165

H
Haojun Liao 已提交
166 167
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
168
    pSupInfo->colIds[i] = pCol->info.colId;
169 170 171 172

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
173
  }
H
Hongze Cheng 已提交
174

H
Haojun Liao 已提交
175 176
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
177

178
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
179
  // allocate buffer in order to load data blocks from file
180 181 182 183
  // todo use simple hash instead
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
184 185 186
    return NULL;
  }

187 188 189 190 191
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
192 193
      }

194
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
195
    } else {
196
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
197
    }
wmmhello's avatar
wmmhello 已提交
198

199 200 201
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
202 203
  }

204
  return pTableMap;
H
Hongze Cheng 已提交
205
}
H
Hongze Cheng 已提交
206

207 208 209
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
210
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
211 212
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
213
    if (p->iter.iter != NULL) {
214
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
215 216
    }

217
    p->delSkyline = taosArrayDestroy(p->delSkyline);
218 219 220
  }
}

221 222 223 224 225 226 227 228
static void destroyBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
    p->iterInit = false;
    p->iiter.hasVal = false;

    if (p->iter.iter != NULL) {
229
      p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
230 231 232
    }

    if (p->iiter.iter != NULL) {
233
      p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
234 235
    }

236 237
    p->delSkyline = taosArrayDestroy(p->delSkyline);
    p->pBlockList = taosArrayDestroy(p->pBlockList);
238 239 240 241 242
  }

  taosHashCleanup(pTableMap);
}

243
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
244 245
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
246
}
H
Hongze Cheng 已提交
247

248 249 250
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
251
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
252

253
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
254
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
255

dengyihao's avatar
dengyihao 已提交
256
  STimeWindow win = *pWindow;
257 258 259 260 261 262
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
263

H
Haojun Liao 已提交
264
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
265 266 267 268 269 270
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
271 272 273
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
274 275 276 277
  }
}

// init file iterator
278
static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFState, int32_t order, const char* idstr) {
279 280
  size_t numOfFileset = taosArrayGetSize(pFState->aDFileSet);

281 282 283
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
  pIter->pFileList = taosArrayDup(pFState->aDFileSet);
284
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
285

H
Haojun Liao 已提交
286
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
287 288 289
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
290
static void cleanupFilesetIterator(SFilesetIter* pIter) { taosArrayDestroy(pIter->pFileList); }
291

292
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
293 294
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
295 296 297
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
298 299 300 301 302
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
303

304
  while (1) {
H
Haojun Liao 已提交
305 306 307
    if (pReader->pFileReader != NULL) {
      tsdbDataFReaderClose(&pReader->pFileReader);
    }
308

309
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
310

311 312 313 314
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
315

316 317 318 319 320 321 322 323 324 325 326 327
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
328 329 330
      if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
        return false;
      }
331 332
      continue;
    }
C
Cary Xu 已提交
333

334 335
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, pReader->window.skey,
              pReader->window.ekey, pReader->idStr);
336 337
    return true;
  }
338

339
_err:
H
Haojun Liao 已提交
340 341 342
  return false;
}

343
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
344 345
  pIter->order = order;
  pIter->index = -1;
H
Haojun Liao 已提交
346
  pIter->numOfBlocks = -1;
347 348 349 350 351 352 353
  if (pIter->blockList == NULL) {
    pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
  } else {
    taosArrayClear(pIter->blockList);
  }
}

L
Liu Jicong 已提交
354
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
H
Haojun Liao 已提交
355

H
Haojun Liao 已提交
356
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
357 358
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
359 360
}

361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

H
Haojun Liao 已提交
384 385
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
  int32_t      code = 0;
386
  int8_t       level = 0;
H
Haojun Liao 已提交
387
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
388 389
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
390
    goto _end;
H
Hongze Cheng 已提交
391 392
  }

H
Haojun Liao 已提交
393
  initReaderStatus(&pReader->status);
394

L
Liu Jicong 已提交
395
  pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
396 397 398 399 400
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
  pReader->capacity = 4096;
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
401
  pReader->type = pCond->type;
402
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
403

404
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
405

406
  limitOutputBufferSize(pCond, &pReader->capacity);
407

408 409
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
410
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
411
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
412
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
413 414 415
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
416

417 418
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

419 420 421 422
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
423
  }
H
Hongze Cheng 已提交
424

425 426
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
427 428
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
429

H
Haojun Liao 已提交
430 431
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
432 433 434
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
468
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

489 490
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT
// maxVer) {
H
Hongze Cheng 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
//   TSDBROW row = {0};
//   STSRow *rmem = NULL, *rimem = NULL;

//   if (pCheckInfo->iter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
//       rmem = row.pTSRow;
//     }
//   }

//   if (pCheckInfo->iiter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
//       rimem = row.pTSRow;
//     }
//   }

//   if (rmem == NULL && rimem == NULL) {
//     return TSKEY_INITIAL_VAL;
//   }

//   if (rmem != NULL && rimem == NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return TD_ROW_KEY(rmem);
//   }

//   if (rmem == NULL && rimem != NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return TD_ROW_KEY(rimem);
//   }

//   TSKEY r1 = TD_ROW_KEY(rmem);
//   TSKEY r2 = TD_ROW_KEY(rimem);

//   if (r1 == r2) {
//     if (TD_SUPPORT_UPDATE(update)) {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
//     } else {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//       tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     return r1;
//   } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return r1;
//   } else {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return r2;
//   }
// }

H
Haojun Liao 已提交
540
// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) {
H
Hongze Cheng 已提交
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
//   bool hasNext = false;
//   if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iiter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
//     }
//   } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
//     }
//   } else {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
//     }
//   }
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575 576
//   return hasNext;
// }
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579 580
// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
583

H
Hongze Cheng 已提交
584 585 586
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
589

H
Hongze Cheng 已提交
590 591 592 593 594 595 596 597 598 599 600
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
601

H
Hongze Cheng 已提交
602 603
//   return midSlot;
// }
H
Hongze Cheng 已提交
604

H
Haojun Liao 已提交
605
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
606
  SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
607

608
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
H
Haojun Liao 已提交
609
  if (code != TSDB_CODE_SUCCESS) {
610
    goto _end;
H
Haojun Liao 已提交
611
  }
H
Hongze Cheng 已提交
612

H
Hongze Cheng 已提交
613 614
  if (taosArrayGetSize(aBlockIdx) == 0) {
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
615 616
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
617

618
  SBlockIdx* pBlockIdx;
H
Hongze Cheng 已提交
619
  for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
620
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
621

622
    // uid check
H
Hongze Cheng 已提交
623
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
624 625 626 627
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
628
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
629 630 631 632
    if (p == NULL) {
      continue;
    }

633 634
    // todo: not valid info in bockIndex
    // time range check
635 636 637
    //    if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) {
    //      continue;
    //    }
638 639

    // version check
640 641 642
    //    if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) {
    //      continue;
    //    }
H
Haojun Liao 已提交
643 644 645 646 647 648

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
      pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
    }

H
Hongze Cheng 已提交
649 650
    pScanInfo->blockIdx = *pBlockIdx;
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
651
  }
H
Hongze Cheng 已提交
652

653
_end:
H
Hongze Cheng 已提交
654
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
655 656
  return code;
}
H
Hongze Cheng 已提交
657

658 659
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables,
                               int32_t* numOfBlocks) {
H
Haojun Liao 已提交
660
  size_t numOfTables = taosArrayGetSize(pIndexList);
H
Hongze Cheng 已提交
661

H
Haojun Liao 已提交
662
  *numOfValidTables = 0;
H
Hongze Cheng 已提交
663

664
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
665
  while (1) {
666 667 668 669 670 671 672 673
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }

    taosArrayClear(px->pBlockList);
  }

dengyihao's avatar
dengyihao 已提交
674
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
675
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
676

H
Hongze Cheng 已提交
677
    SMapData mapData = {0};
H
Haojun Liao 已提交
678 679
    tMapDataReset(&mapData);
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
H
Hongze Cheng 已提交
680

H
Haojun Liao 已提交
681 682 683
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
    for (int32_t j = 0; j < mapData.nItem; ++j) {
      SBlock block = {0};
H
Hongze Cheng 已提交
684

H
Hongze Cheng 已提交
685
      tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
686

687
      // 1. time range check
688
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
689 690
        continue;
      }
H
Hongze Cheng 已提交
691

692
      // 2. version range check
693 694 695
      if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
        continue;
      }
696

H
Haojun Liao 已提交
697 698
      void* p = taosArrayPush(pScanInfo->pBlockList, &block);
      if (p == NULL) {
H
Haojun Liao 已提交
699
        tMapDataClear(&mapData);
H
Haojun Liao 已提交
700 701
        return TSDB_CODE_OUT_OF_MEMORY;
      }
702 703

      (*numOfBlocks) += 1;
H
Haojun Liao 已提交
704
    }
H
Hongze Cheng 已提交
705

H
Haojun Liao 已提交
706
    tMapDataClear(&mapData);
H
Haojun Liao 已提交
707 708 709 710
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
      (*numOfValidTables) += 1;
    }
  }
H
Hongze Cheng 已提交
711

H
Haojun Liao 已提交
712 713
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
714

715 716
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
717
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
718

719
  pDumpInfo->allDumped = true;
720
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
721 722
}

723 724
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
725
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
726
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
727 728 729 730 731 732 733
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
734
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
735
  }
H
Haojun Liao 已提交
736 737
}

738
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
739
  SReaderStatus*  pStatus = &pReader->status;
740
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
741

742
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
743
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Haojun Liao 已提交
744 745
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
746
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
747

H
Haojun Liao 已提交
748
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
749
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
750

751
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
752

H
Haojun Liao 已提交
753
  SColVal cv = {0};
754 755
  int32_t colIndex = 0;

756 757
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
758

759
  int32_t rowIndex = 0;
760 761
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

762 763 764 765 766 767 768 769
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

770
  int32_t          i = 0;
771 772
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
773
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
774 775 776 777 778
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

H
Hongze Cheng 已提交
779
  while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
780 781 782
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
783
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
784 785

    if (pData->cid == pColData->info.colId) {
786
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
787 788
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
789
      }
790 791 792
      colIndex += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
793
    }
794 795 796 797 798

    ASSERT(rowIndex == remain);
    i += 1;
  }

799
  while (i < numOfCols) {
800 801 802
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
803
  }
H
Haojun Liao 已提交
804

805
  pResBlock->info.rows = remain;
806
  pDumpInfo->rowIndex += step * remain;
807 808

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
809

H
Haojun Liao 已提交
810 811
  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
812

813
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
814
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
815
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
816 817 818 819 820 821 822
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

  return TSDB_CODE_SUCCESS;
}

// todo consider the output buffer size
823 824
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter,
                                   STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
825 826 827 828 829 830 831 832 833 834
  int64_t st = taosGetTimestampUs();

  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

H
Hongze Cheng 已提交
835 836
  int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
                                 pBlockData, NULL, NULL);
837 838 839 840 841 842 843 844 845
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;

  pDumpInfo->allDumped = false;
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
846
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
847
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
H
Haojun Liao 已提交
848 849
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
850 851

_error:
H
Haojun Liao 已提交
852 853 854 855 856
  tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, %s",
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pReader->idStr);
  return code;
H
Haojun Liao 已提交
857
}
H
Hongze Cheng 已提交
858

H
Hongze Cheng 已提交
859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
//   int    firstPos, lastPos, midPos = -1;
//   int    numOfRows;
//   TSKEY* keyList;

//   assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

//   if (num <= 0) return -1;

//   keyList = (TSKEY*)pValue;
//   firstPos = 0;
//   lastPos = num - 1;

//   if (order == TSDB_ORDER_DESC) {
//     // find the first position which is smaller than the key
//     while (1) {
//       if (key >= keyList[lastPos]) return lastPos;
//       if (key == keyList[firstPos]) return firstPos;
//       if (key < keyList[firstPos]) return firstPos - 1;

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }

//   } else {
//     // find the first position which is bigger than the key
//     while (1) {
//       if (key <= keyList[firstPos]) return firstPos;
//       if (key == keyList[lastPos]) return lastPos;

//       if (key > keyList[lastPos]) {
//         lastPos = lastPos + 1;
//         if (lastPos >= num)
//           return -1;
//         else
//           return lastPos;
//       }

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }
//   }
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918 919
//   return midPos;
// }
H
Hongze Cheng 已提交
920

H
Hongze Cheng 已提交
921 922
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
923

H
Hongze Cheng 已提交
924 925 926 927 928 929
//   if (cur->rows > 0) {
//     if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
//       assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
//     } else {
//       assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
//     }
H
Hongze Cheng 已提交
930

H
Hongze Cheng 已提交
931 932 933 934 935
//     SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
//     assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
//            cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
//   } else {
//     cur->win = pTsdbReadHandle->window;
H
Hongze Cheng 已提交
936

H
Hongze Cheng 已提交
937 938 939 940
//     int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
//     cur->lastKey = pTsdbReadHandle->window.ekey + step;
//   }
// }
H
Hongze Cheng 已提交
941

H
Haojun Liao 已提交
942
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo,
H
Hongze Cheng 已提交
943 944
//                                            SDataBlockInfo* pBlockInfo, int32_t endPos) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946 947
//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   TSKEY*     tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949
//   bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Hongze Cheng 已提交
950

H
Hongze Cheng 已提交
951
//   int32_t step = ascScan ? 1 : -1;
H
Hongze Cheng 已提交
952

H
Hongze Cheng 已提交
953 954
//   int32_t start = cur->pos;
//   int32_t end = endPos;
H
Hongze Cheng 已提交
955

H
Hongze Cheng 已提交
956 957 958
//   if (!ascScan) {
//     TSWAP(start, end);
//   }
H
Hongze Cheng 已提交
959

H
Hongze Cheng 已提交
960 961
//   assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
//   int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Hongze Cheng 已提交
962

H
Hongze Cheng 已提交
963 964 965 966 967
//   // the time window should always be ascending order: skey <= ekey
//   cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
//   cur->mixBlock = (numOfRows != pBlockInfo->rows);
//   cur->lastKey = tsArray[endPos] + step;
//   cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Hongze Cheng 已提交
968

H
Hongze Cheng 已提交
969 970 971 972
//   // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
//   int32_t pos = endPos + step;
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
973

H
Hongze Cheng 已提交
974 975 976 977
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
978

H
Hongze Cheng 已提交
979 980
// // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// // be included in the query time window will be discarded
H
Haojun Liao 已提交
981
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) {
H
Hongze Cheng 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
//   SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
//   STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);

//   initTableMemIterator(pTsdbReadHandle, pCheckInfo);

//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
//          cur->pos >= 0 && cur->pos < pBlock->numOfRows);
//   // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData
//   interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] ==
//   pBlock->minKey.ts &&
//          tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);

//   bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
//   int32_t step = ascScan ? 1 : -1;

//   // for search the endPos, so the order needs to reverse
//   int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;

//   int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
//   int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);

//   STimeWindow* pWin = &blockInfo.window;
//   tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
//             " rows:%d, start:%d, end:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
//             pTsdbReadHandle->idStr);

//   // compared with the data from in-memory buffer, to generate the correct timestamp array list
//   int32_t numOfRows = 0;
//   int32_t curRow = 0;

//   int16_t   rv1 = -1;
//   int16_t   rv2 = -1;
//   STSchema* pSchema1 = NULL;
//   STSchema* pSchema2 = NULL;

//   int32_t pos = cur->pos;
//   cur->win = TSWINDOW_INITIALIZER;
//   bool adjustPos = false;

//   // no data in buffer, load data from file directly
//   if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
//     copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
//     return;
//   } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
//     SSkipListNode* node = NULL;
//     TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;

//     do {
//       STSRow* row2 = NULL;
//       STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
//       if (row1 == NULL) {
//         break;
//       }

//       TSKEY key = TD_ROW_KEY(row1);
//       if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
//         break;
//       }

//       if (adjustPos) {
//         if (key == lastKeyAppend) {
//           pos -= step;
//         }
//         adjustPos = false;
//       }

//       if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
//           ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         break;
//       }

//       if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
//         if (rv1 != TD_ROW_SVER(row1)) {
//           //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
//           rv1 = TD_ROW_SVER(row1);
//         }
//         if (row2 && rv2 != TD_ROW_SVER(row2)) {
//           //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
//           rv2 = TD_ROW_SVER(row2);
//         }

//         numOfRows +=
//             mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = key;
//         }

//         cur->win.ekey = key;
//         cur->lastKey = key + step;
//         cur->mixBlock = true;
//         moveToNextRowInMem(pCheckInfo);
//       } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
//         if (TD_SUPPORT_UPDATE(pCfg->update)) {
//           if (lastKeyAppend != key) {
//             if (lastKeyAppend != TSKEY_INITIAL_VAL) {
//               ++curRow;
//             }
//             lastKeyAppend = key;
//           }
//           // load data from file firstly
//           numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);

//           if (rv1 != TD_ROW_SVER(row1)) {
//             rv1 = TD_ROW_SVER(row1);
//           }
//           if (row2 && rv2 != TD_ROW_SVER(row2)) {
//             rv2 = TD_ROW_SVER(row2);
//           }

//           // still assign data into current row
//           numOfRows +=
//               mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                  pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);

//           if (cur->win.skey == TSKEY_INITIAL_VAL) {
//             cur->win.skey = key;
//           }

//           cur->win.ekey = key;
//           cur->lastKey = key + step;
//           cur->mixBlock = true;

//           moveToNextRowInMem(pCheckInfo);

//           pos += step;
//           adjustPos = true;
//         } else {
//           // discard the memory record
//           moveToNextRowInMem(pCheckInfo);
//         }
//       } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
//         assert(end != -1);

//         if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
// #if 0
//           if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
// #endif
//           if (!TD_SUPPORT_UPDATE(pCfg->update)) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
//         }

//         int32_t qstart = 0, qend = 0;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);

//         if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
//           ++curRow;
//         }

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
//         pos += (qend - qstart + 1) * step;
//         if (numOfRows > 0) {
//           curRow = numOfRows - 1;
//         }

//         cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
//         cur->lastKey = cur->win.ekey + step;
//         lastKeyAppend = cur->win.ekey;
//       }
//     } while (numOfRows < pTsdbReadHandle->outputCapacity);

//     if (numOfRows < pTsdbReadHandle->outputCapacity) {
//       /**
//        * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
//        * copy them all to result buffer, since it may be overlapped with file data block.
//        */
//       if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan)
//       ||
//           ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         // no data in cache or data in cache is greater than the ekey of time window, load data from file block
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t start = -1, end = -1;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
//         pos += (end - start + 1) * step;

//         cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
//         cur->lastKey = cur->win.ekey + step;
//         cur->mixBlock = true;
//       }
//     }
//   }
H
Hongze Cheng 已提交
1183

H
Hongze Cheng 已提交
1184 1185
//   cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
//                          ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
H
Hongze Cheng 已提交
1186

H
Hongze Cheng 已提交
1187 1188 1189
//   if (!ascScan) {
//     TSWAP(cur->win.skey, cur->win.ekey);
//   }
H
Hongze Cheng 已提交
1190

H
Hongze Cheng 已提交
1191 1192
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
1193

H
Hongze Cheng 已提交
1194 1195 1196 1197
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
1198

H
Haojun Liao 已提交
1199 1200 1201
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1202

H
Haojun Liao 已提交
1203 1204 1205 1206
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1207

H
Haojun Liao 已提交
1208 1209
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1210

H
Haojun Liao 已提交
1211 1212
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1213

H
Haojun Liao 已提交
1214
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1215 1216
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1217

H
Haojun Liao 已提交
1218 1219 1220 1221
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1222

H
Haojun Liao 已提交
1223 1224
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1225

H
Haojun Liao 已提交
1226
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1227
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1228
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1229

H
Haojun Liao 已提交
1230
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1231

H
Haojun Liao 已提交
1232 1233
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1234

H
Haojun Liao 已提交
1235 1236 1237 1238 1239 1240 1241
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1242

1243
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1244
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1245

H
Haojun Liao 已提交
1246 1247
  return pLeftBlock->pBlock->aSubBlock[0].offset > pRightBlock->pBlock->aSubBlock[0].offset ? 1 : -1;
}
H
Hongze Cheng 已提交
1248

H
Haojun Liao 已提交
1249
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1250
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1251

1252
  pBlockIter->numOfBlocks = numOfBlocks;
1253 1254
  taosArrayClear(pBlockIter->blockList);

1255 1256
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1257

1258
  SBlockOrderSupporter sup = {0};
H
Haojun Liao 已提交
1259

1260 1261 1262 1263
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1264

1265 1266 1267 1268 1269 1270 1271
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1272

1273 1274 1275 1276
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1277

1278 1279
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1280

1281 1282 1283 1284 1285
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1286

1287 1288 1289 1290 1291
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
      wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
      wrapper.uid = pTableScanInfo->uid;
H
Haojun Liao 已提交
1292

1293 1294 1295 1296 1297 1298
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1299

1300
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1301

1302 1303 1304 1305 1306
  // since there is only one table qualified, blocks are not sorted
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1307
    }
1308 1309
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt,
              pReader->idStr);
1310

1311
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
H
Haojun Liao 已提交
1312 1313

    cleanupBlockOrderSupporter(&sup);
1314
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1315
  }
H
Haojun Liao 已提交
1316

1317 1318
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1319

1320
  assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1321

1322 1323 1324 1325 1326
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1327
  }
H
Haojun Liao 已提交
1328

1329 1330 1331 1332
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1333

1334 1335
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1336

1337 1338 1339 1340
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1341

1342 1343
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1344
  }
H
Haojun Liao 已提交
1345

1346 1347 1348
  tsdbDebug("%p %d data blocks sort completed, %s", pReader, cnt, pReader->idStr);
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1349

1350 1351
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1352
}
H
Hongze Cheng 已提交
1353

H
Haojun Liao 已提交
1354
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1355 1356
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1357
  int32_t step = asc ? 1 : -1;
1358
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1359 1360 1361
    return false;
  }

1362
  pBlockIter->index += step;
1363 1364 1365
  return true;
}

1366 1367 1368
/**
 * This is an two rectangles overlap cases.
 */
1369
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1370 1371 1372 1373
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
         (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
         (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
H
Haojun Liao 已提交
1374
}
H
Hongze Cheng 已提交
1375

H
Haojun Liao 已提交
1376 1377 1378 1379
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pFBlockInfo;
}
H
Hongze Cheng 已提交
1380

1381 1382
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1383 1384 1385
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1386 1387
  }

1388
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1389 1390 1391
    return NULL;
  }

1392
  int32_t step = asc ? 1 : -1;
1393 1394 1395 1396 1397 1398 1399 1400 1401

  *nextIndex = pFBlockInfo->tbBlockIdx + step;
  SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  return pNext;
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1402
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1403 1404
  int32_t index = pBlockIter->index;

1405
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1418
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1419 1420 1421 1422 1423
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1424 1425 1426 1427 1428
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1429

1430 1431 1432
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443

  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1444
}
H
Hongze Cheng 已提交
1445

1446
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1447
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1448

1449
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1450
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1451
}
H
Hongze Cheng 已提交
1452

H
Haojun Liao 已提交
1453
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
1454 1455
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
         (pBlock->minVersion <= pVerRange->maxVer);
H
Haojun Liao 已提交
1456 1457
}

1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);

  for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
      if (p->version >= pBlock->minVersion) {
        return true;
      }
    } else if (p->ts < pBlock->minKey.ts) {  // p->ts < pBlock->minKey.ts
      if (p->version >= pBlock->minVersion) {
        if (i < num - 1) {
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
          if (i + 1 == num - 1) {  // pnext is the last point
            if (pnext->ts >= pBlock->minKey.ts) {
              return true;
            }
          } else {
            if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
              return true;
            }
          }
        } else {  // it must be the last point
          ASSERT(p->version == 0);
        }
      }
    } else {  // (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  return false;
}

1492
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1493 1494 1495 1496
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

1497
  // ts is not overlap
1498
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
L
Liu Jicong 已提交
1499
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
1500 1501 1502 1503 1504
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

  // version is not overlap
1505 1506 1507 1508
  if (ASCENDING_TRAVERSE(order)) {
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  } else {
    int32_t index = pBlockScanInfo->fileDelIndex;
1509
    while (1) {
1510 1511 1512 1513 1514
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
      if (p->ts > pBlock->minKey.ts && index > 0) {
        index -= 1;
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
        break;
1515 1516 1517
      }
    }

1518 1519
    return doCheckforDatablockOverlap(pBlockScanInfo, pBlock);
  }
1520 1521
}

1522 1523 1524 1525
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1526
// 5. delete info should not overlap with current block data
1527 1528
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY key) {
1529 1530 1531
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1532
  // overlap with neighbor
1533 1534 1535 1536 1537
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
  }

1538
  // has duplicated ts of different version in this block
L
Liu Jicong 已提交
1539 1540
  bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
  bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1541

1542
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1543
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
H
Haojun Liao 已提交
1544 1545
}

1546
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1547
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1548 1549
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1550

1551 1552 1553
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1554
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1555

1556
  blockDataUpdateTsWindow(pBlock, 0);
1557
  pBlock->info.uid = pBlockScanInfo->uid;
1558

1559
  setComposedBlockFlag(pReader, true);
1560 1561 1562 1563 1564 1565

  int64_t elapsedTime = taosGetTimestampUs() - st;
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
            " us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
            pBlock->info.window.ekey, pReader->idStr);
H
Haojun Liao 已提交
1566 1567 1568
  return code;
}

1569
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1570
                                     STSRow* pTSRow, SIterInfo* pIter, int64_t key) {
1571 1572 1573 1574 1575
  SRowMerger          merge = {0};
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBKEY k = TSDBROW_KEY(pRow);
1576
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1577
  SArray* pDelList = pBlockScanInfo->delSkyline;
1578

1579 1580 1581 1582 1583 1584 1585 1586
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else if (k.ts < key) {  // k.ts < key
1587
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1588 1589 1590
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1591 1592

      tRowMerge(&merge, pRow);
1593
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1594 1595

      tRowMergerGetRow(&merge, &pTSRow);
1596
    }
1597 1598
  } else {  // descending order scan
    if (key < k.ts) {
1599
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1600 1601
    } else if (k.ts < key) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1602

1603 1604 1605 1606 1607 1608
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else {  // descending order: mem rows -----> imem rows ------> file block
      updateSchema(pRow, pBlockScanInfo->uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1609
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1610 1611 1612 1613 1614 1615

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
    }
1616 1617
  }

1618
  tRowMergerClear(&merge);
1619 1620 1621 1622
  doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
  return TSDB_CODE_SUCCESS;
}

1623 1624 1625 1626
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1627 1628
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
dengyihao's avatar
dengyihao 已提交
1629
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1630

1631 1632
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1633
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1634

1635
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
H
Haojun Liao 已提交
1636

1637
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1638

1639 1640 1641 1642
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

  if (ASCENDING_TRAVERSE(pReader->order)) {
1643 1644
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1645 1646 1647
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1648
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1649

1650 1651
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1652
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1653 1654
      }

1655 1656
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1657
        doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1658 1659 1660 1661
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1662
      return TSDB_CODE_SUCCESS;
1663
    } else {  // key > ik.ts || key > k.ts
1664 1665
      ASSERT(key != ik.ts);

1666
      // [3] ik.ts < key <= k.ts
1667
      // [4] ik.ts < k.ts <= key
1668
      if (ik.ts < k.ts) {
1669
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1670 1671 1672 1673
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1674 1675
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1676
      if (k.ts < ik.ts) {
1677
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
1678 1679 1680 1681
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1682
      // [7] k.ts == ik.ts < key
1683
      if (k.ts == ik.ts) {
1684 1685
        ASSERT(key > ik.ts && key > k.ts);

1686
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
1687 1688 1689 1690
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
1691 1692 1693 1694 1695 1696
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
      updateSchema(pRow, uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1697
      doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1698 1699 1700

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1701
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
      return TSDB_CODE_SUCCESS;
    } else {
1714
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1715 1716 1717 1718

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1719
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1738
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1751
  return -1;
1752 1753
}

dengyihao's avatar
dengyihao 已提交
1754 1755
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1767
  TSDBKEY k = {.ts = ts, .version = ver};
1768
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1769 1770 1771
    return false;
  }

1772 1773 1774
  return true;
}

1775
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1776

1777 1778 1779 1780 1781 1782 1783 1784
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;

  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  int64_t  key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1785 1786
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1787

1788
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1789
    return doMergeThreeLevelRows(pReader, pBlockScanInfo);
1790
  } else {
1791
    // imem + file
1792 1793
    if (pBlockScanInfo->iiter.hasVal) {
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key);
1794 1795
    }

1796
    // mem + file
1797
    if (pBlockScanInfo->iter.hasVal) {
dengyihao's avatar
dengyihao 已提交
1798
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1799
    }
1800

1801
    // imem & mem are all empty, only file exist
1802
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1803 1804 1805 1806
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
    doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1807

1808
    return TSDB_CODE_SUCCESS;
1809 1810 1811
  }
}

1812
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
1813 1814
  SSDataBlock* pResBlock = pReader->pResBlock;

1815
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1816 1817
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1818

1819
  while (1) {
1820 1821
    // todo check the validate of row in file block
    {
1822
      if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836
        pDumpInfo->rowIndex += step;

        SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
        SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);

        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
    }

1837
    buildComposedDataBlockImpl(pReader, pBlockScanInfo);
1838

1839 1840
    SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
    SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
1841

1842 1843 1844 1845 1846 1847 1848 1849
    // currently loaded file data block is consumed
    if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
1850 1851 1852 1853
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
1854 1855
  blockDataUpdateTsWindow(pResBlock, 0);

1856
  setComposedBlockFlag(pReader, true);
1857

1858 1859
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader,
            pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
1860
            pReader->idStr);
1861

1862 1863 1864 1865 1866
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

1867
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1868 1869 1870 1871
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

1872
  int32_t code = TSDB_CODE_SUCCESS;
1873 1874 1875 1876 1877 1878 1879 1880 1881

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
1882 1883

  STbData* d = NULL;
H
Hongze Cheng 已提交
1884 1885
  if (pReader->pMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d);
1886
    if (d != NULL) {
1887
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
1888
      if (code == TSDB_CODE_SUCCESS) {
1889
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
1890

H
Haojun Liao 已提交
1891
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1892 1893
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
1894
      } else {
1895 1896
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1897
        return code;
1898 1899
      }
    }
H
Haojun Liao 已提交
1900
  } else {
1901
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
1902 1903
  }

1904
  STbData* di = NULL;
H
Hongze Cheng 已提交
1905 1906
  if (pReader->pIMem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
1907
    if (di != NULL) {
1908
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
1909
      if (code == TSDB_CODE_SUCCESS) {
1910
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
1911

H
Haojun Liao 已提交
1912
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1913
                  "-%" PRId64 " %s",
1914
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
1915
      } else {
1916 1917
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1918
        return code;
1919 1920
      }
    }
H
Haojun Liao 已提交
1921 1922
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
1923 1924
  }

1925 1926
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

1927
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
1928 1929 1930
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1931 1932
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
1933 1934 1935
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
1936

1937 1938 1939
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

1940 1941
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

H
Hongze Cheng 已提交
1942
  SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState);
1943 1944 1945
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
1946
    if (code != TSDB_CODE_SUCCESS) {
1947 1948 1949 1950 1951
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
1952
      tsdbDelFReaderClose(&pDelFReader);
1953 1954 1955
      goto _err;
    }

1956
    code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
1957 1958 1959
    if (code != TSDB_CODE_SUCCESS) {
      taosArrayDestroy(aDelIdx);
      tsdbDelFReaderClose(&pDelFReader);
1960 1961
      goto _err;
    }
1962

1963 1964 1965
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

H
Haojun Liao 已提交
1966 1967
    if (pIdx != NULL) {
      code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
1968 1969 1970 1971 1972 1973 1974
    }

    taosArrayDestroy(aDelIdx);
    tsdbDelFReaderClose(&pDelFReader);

    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
1975
    }
1976
  }
1977

1978 1979 1980 1981 1982 1983 1984
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
1985 1986
  }

1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2001 2002
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2003 2004
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2005 2006
  return code;

2007 2008 2009
_err:
  taosArrayDestroy(pDelData);
  return code;
2010 2011
}

2012 2013 2014
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

2015
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2016 2017
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

2018 2019
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2020
  if (pRow != NULL) {
2021 2022 2023
    key = TSDBROW_KEY(pRow);
  }

2024
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2025
  if (pRow != NULL) {
2026 2027 2028 2029 2030 2031 2032 2033 2034
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

H
Haojun Liao 已提交
2035 2036
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
2037
  SArray*        pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
H
Haojun Liao 已提交
2038 2039

  while (1) {
2040
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2041
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2042 2043 2044
      break;
    }

H
Haojun Liao 已提交
2045
    taosArrayClear(pIndexList);
H
Haojun Liao 已提交
2046 2047
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2048
      taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2049 2050 2051 2052 2053 2054 2055
      return code;
    }

    if (taosArrayGetSize(pIndexList) > 0) {
      uint32_t numOfValidTable = 0;
      code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2056
        taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2057 2058 2059 2060 2061 2062 2063 2064 2065 2066
        return code;
      }

      if (numOfValidTable > 0) {
        break;
      }
    }
    // no blocks in current file, try next files
  }

H
Haojun Liao 已提交
2067
  taosArrayDestroy(pIndexList);
H
Haojun Liao 已提交
2068 2069 2070
  return TSDB_CODE_SUCCESS;
}

2071 2072 2073
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

2074
  SReaderStatus*  pStatus = &pReader->status;
2075 2076
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

2077 2078
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2079 2080 2081 2082 2083

  SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);

  TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
  if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
2084 2085
    tBlockDataInit(&pStatus->fileBlockData);
    code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
2086 2087 2088 2089 2090
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // build composed data block
2091
    code = buildComposedDataBlock(pReader, pScanInfo);
2092 2093
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2094
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2095
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2096
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2097
  } else {  // whole block is required, return it directly
2098
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
2099 2100 2101
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
2102
    setComposedBlockFlag(pReader, false);
2103
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
2104 2105 2106 2107 2108
  }

  return code;
}

H
Haojun Liao 已提交
2109
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2110 2111
  SReaderStatus* pStatus = &pReader->status;

2112
  while (1) {
2113 2114 2115
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2116
        return TSDB_CODE_SUCCESS;
2117 2118 2119 2120
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2121
    initMemDataIterator(pBlockScanInfo, pReader);
2122

2123
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2124
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2125 2126 2127 2128
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2129
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2130
      return TSDB_CODE_SUCCESS;
2131 2132 2133 2134 2135
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2136
      return TSDB_CODE_SUCCESS;
2137 2138 2139 2140
    }
  }
}

2141
// set the correct start position in case of the first/last file block, according to the query time window
2142
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2143
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2144
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2145
  SBlock*              pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
2146

2147 2148 2149
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2150 2151 2152

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2153
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2154 2155
}

2156
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170
  int32_t numOfBlocks = 0;
  int32_t code = moveToNextFile(pReader, &numOfBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
  if (numOfBlocks == 0) {
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
  code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
2171 2172

  // set the correct start position according to the query time window
2173
  initBlockDumpInfo(pReader, pBlockIter);
2174 2175 2176
  return code;
}

2177
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2178 2179
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2180 2181
}

2182
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2183
  int32_t code = TSDB_CODE_SUCCESS;
2184 2185
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2186 2187
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2188
  while (1) {
2189
    SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
2190 2191
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

2192 2193
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2194
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
      code = buildComposedDataBlock(pReader, pScanInfo);
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
        } else {  // data blocks in current file are exhausted, let's try the next file now
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2210
        }
H
Haojun Liao 已提交
2211
      }
2212 2213 2214

      // current block is not loaded yet, or data in buffer may overlap with the file block.
      code = doBuildDataBlock(pReader);
2215 2216
    }

2217 2218 2219 2220 2221 2222 2223 2224
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2225
}
H
refact  
Hongze Cheng 已提交
2226

2227 2228
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2229
  if (VND_IS_RSMA(pVnode)) {
2230
    int8_t  level = 0;
2231 2232
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2233
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2247 2248
    int32_t     vgId = TD_VID(pVnode);
    const char* str = (idStr != NULL) ? idStr : "";
2249 2250

    if (level == TSDB_RETENTION_L0) {
2251
      *pLevel = TSDB_RETENTION_L0;
2252
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
2253 2254
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2255
      *pLevel = TSDB_RETENTION_L1;
2256
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
2257 2258
      return VND_RSMA1(pVnode);
    } else {
2259
      *pLevel = TSDB_RETENTION_L2;
2260
      tsdbDebug("vgId:%d, rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
2261 2262 2263 2264 2265 2266 2267
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2268
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
L
Liu Jicong 已提交
2269
  int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
H
Haojun Liao 已提交
2270

2271
  if (VND_IS_RSMA(pVnode)) {
H
Haojun Liao 已提交
2272 2273 2274 2275
    return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
  }

  int64_t endVer = 0;
L
Liu Jicong 已提交
2276 2277
  if (pCond->endVersion ==
      -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
H
Haojun Liao 已提交
2278 2279
    endVer = pVnode->state.applied;
  } else {
L
Liu Jicong 已提交
2280
    endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
2281 2282
  }

H
Haojun Liao 已提交
2283
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2284 2285
}

H
Hongze Cheng 已提交
2286 2287 2288 2289
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
2290

H
Hongze Cheng 已提交
2291 2292
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
2293

H
Hongze Cheng 已提交
2294 2295
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
2296
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
2297

H
Hongze Cheng 已提交
2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
2312
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
2313 2314 2315 2316 2317 2318
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

2319
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2320 2321 2322 2323
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
L
Liu Jicong 已提交
2324 2325 2326
  size_t  num = taosArrayGetSize(pDelList);
  bool    asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc ? 1 : -1;
2327

2328 2329 2330 2331 2332 2333
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2334
        return false;
2335 2336 2337
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2338 2339
      }
    } else {
2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2370 2371
    }
  } else {
2372 2373
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2374

2375 2376 2377 2378 2379 2380 2381
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2382
    } else {
2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2410 2411 2412 2413 2414
      }

      return false;
    }
  }
2415 2416

  return false;
2417 2418 2419 2420
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2421 2422
    return NULL;
  }
H
Hongze Cheng 已提交
2423

2424
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2425
  TSDBKEY  key = TSDBROW_KEY(pRow);
2426
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2427
    pIter->hasVal = false;
H
Haojun Liao 已提交
2428 2429
    return NULL;
  }
H
Hongze Cheng 已提交
2430

2431
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2432
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2433
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2434 2435
    return pRow;
  }
H
Hongze Cheng 已提交
2436

2437
  while (1) {
2438 2439
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2440 2441
      return NULL;
    }
H
Hongze Cheng 已提交
2442

2443
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2444

H
Haojun Liao 已提交
2445
    key = TSDBROW_KEY(pRow);
2446
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2447
      pIter->hasVal = false;
H
Haojun Liao 已提交
2448 2449
      return NULL;
    }
H
Hongze Cheng 已提交
2450

dengyihao's avatar
dengyihao 已提交
2451
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2452
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2453 2454 2455 2456
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2457

dengyihao's avatar
dengyihao 已提交
2458
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
H
Haojun Liao 已提交
2459
  while (1) {
2460 2461
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2462 2463
      break;
    }
H
Hongze Cheng 已提交
2464

2465
    // data exists but not valid
2466
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2467 2468 2469 2470 2471
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2472
    TSDBKEY k = TSDBROW_KEY(pRow);
2473
    if (k.ts != ts) {
H
Haojun Liao 已提交
2474 2475 2476 2477 2478 2479 2480 2481 2482
      break;
    }

    tRowMerge(pMerger, pRow);
  }

  return TSDB_CODE_SUCCESS;
}

2483
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2484
                                          SVersionRange* pVerRange, int32_t step) {
2485 2486
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
2487
      rowIndex += step;
2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2505 2506
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2507
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2508
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2509

2510
  *state = CHECK_FILEBLOCK_QUIT;
2511
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2512 2513 2514

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2515
  if (pNeighborBlock == NULL) {  // do nothing
2516 2517 2518 2519 2520
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
  if (overlap) {  // load next block
2521
    SReaderStatus*  pStatus = &pReader->status;
2522 2523
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2524
    // 1. find the next neighbor block in the scan block list
2525
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2526
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2527

2528
    // 2. remove it from the scan block list
2529
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2530

2531
    // 3. load the neighbor block, and set it to be the currently accessed file data block
2532 2533 2534 2535 2536
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2537
    // 4. check the data values
2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);

    if (pDumpInfo->rowIndex >= pBlock->nRow) {
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2551 2552
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2553 2554
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2555
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2556
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2557
  int32_t step = asc ? 1 : -1;
2558

2559 2560 2561 2562 2563
  pDumpInfo->rowIndex += step;
  if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2564

2565 2566 2567 2568
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2569

2570 2571 2572 2573 2574
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
      SBlock*             pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2575
      }
2576
    }
H
Haojun Liao 已提交
2577
  }
2578

H
Haojun Liao 已提交
2579 2580 2581
  return TSDB_CODE_SUCCESS;
}

2582
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
2583 2584 2585
  int32_t sversion = TSDBROW_SVERSION(pRow);

  if (pReader->pSchema == NULL) {
M
Minglei Jin 已提交
2586
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2587 2588
  } else if (pReader->pSchema->version != sversion) {
    taosMemoryFreeClear(pReader->pSchema);
M
Minglei Jin 已提交
2589
    metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
2590 2591 2592
  }
}

dengyihao's avatar
dengyihao 已提交
2593 2594
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                      STsdbReader* pReader) {
2595 2596 2597
  SRowMerger merge = {0};

  TSDBKEY k = TSDBROW_KEY(pRow);
2598
  updateSchema(pRow, uid, pReader);
H
Haojun Liao 已提交
2599

2600
  tRowMergerInit(&merge, pRow, pReader->pSchema);
2601
  doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
2602
  tRowMergerGetRow(&merge, pTSRow);
2603
  tRowMergerClear(&merge);
2604 2605
}

2606 2607
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2608 2609
  SRowMerger merge = {0};

2610 2611 2612
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2613 2614 2615 2616
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
    updateSchema(piRow, pBlockScanInfo->uid, pReader);

    tRowMergerInit(&merge, piRow, pReader->pSchema);
2617
    doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2618

2619
    tRowMerge(&merge, pRow);
2620
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2621 2622
  } else {
    updateSchema(pRow, pBlockScanInfo->uid, pReader);
2623

2624
    tRowMergerInit(&merge, pRow, pReader->pSchema);
2625
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2626 2627

    tRowMerge(&merge, piRow);
2628
    doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2629
  }
2630 2631 2632 2633

  tRowMergerGetRow(&merge, pTSRow);
}

2634 2635
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
                            int64_t endKey) {
2636 2637
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2638
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2639

2640 2641
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2642
  if (pBlockScanInfo->iter.hasVal) {
2643 2644 2645 2646 2647 2648
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2649
  if (pBlockScanInfo->iiter.hasVal) {
2650 2651 2652 2653 2654 2655
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2656
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2657
    TSDBKEY k = TSDBROW_KEY(pRow);
2658
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2659

2660
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2661
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
2662
    } else if (k.ts < ik.ts) {
2663
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
2664 2665
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
H
Haojun Liao 已提交
2666
    }
2667 2668

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2669 2670
  }

2671 2672
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2673 2674 2675
    return TSDB_CODE_SUCCESS;
  }

2676 2677
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2678 2679 2680 2681 2682 2683
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

2684 2685 2686 2687
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

2688
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2689
  STSchema*           pSchema = pReader->pSchema;
2690

2691
  SColVal colVal = {0};
2692
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2693

2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
      tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2714
    }
2715 2716
  }

2717
  // set null value since current column does not exist in the "pSchema"
2718
  while (i < numOfCols) {
2719 2720 2721 2722 2723
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

2724 2725 2726 2727
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2728 2729
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2730 2731 2732 2733
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
2734
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
2735 2736
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
2737 2738
    }

2739
    doAppendOneRow(pBlock, pReader, pTSRow);
2740
    taosMemoryFree(pTSRow);
H
Haojun Liao 已提交
2741 2742

    // no data in buffer, return immediately
2743
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2744 2745 2746
      break;
    }

2747
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2748 2749 2750 2751
      break;
    }
  } while (1);

2752
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2753 2754
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2755

2756
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2757
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2758 2759 2760 2761 2762
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2763 2764 2765
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2766 2767 2768 2769 2770 2771
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2772

dengyihao's avatar
dengyihao 已提交
2773 2774 2775 2776 2777 2778
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2779

C
Cary Xu 已提交
2780 2781 2782 2783 2784 2785 2786 2787 2788 2789
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
2790
  if (!pCur) {
C
Cary Xu 已提交
2791 2792
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
2807
// ====================================== EXPOSED APIs ======================================
2808 2809
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
H
Haojun Liao 已提交
2810
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, idstr);
H
Haojun Liao 已提交
2811 2812 2813
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
2814

2815 2816 2817 2818 2819 2820 2821
  if (pCond->suid != 0) {
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

H
Haojun Liao 已提交
2822
  STsdbReader* pReader = *ppReader;
2823
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2824 2825 2826
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2827

2828 2829
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
2830 2831 2832
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
2833

H
Haojun Liao 已提交
2834 2835 2836
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
2837

2838 2839
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

H
Hongze Cheng 已提交
2840
  STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
2841
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
2842 2843 2844 2845 2846 2847 2848
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
2849
    if (code != TSDB_CODE_SUCCESS) {
2850 2851 2852 2853
      return code;
    }
  }

H
Hongze Cheng 已提交
2854 2855
  tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem);

2856
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
2857
  return code;
H
Hongze Cheng 已提交
2858 2859

_err:
2860
  tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
2861
  return code;
H
refact  
Hongze Cheng 已提交
2862 2863 2864
}

void tsdbReaderClose(STsdbReader* pReader) {
2865 2866
  if (pReader == NULL) {
    return;
2867
  }
H
refact  
Hongze Cheng 已提交
2868

2869 2870
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

H
Hongze Cheng 已提交
2871 2872
  tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem);

2873 2874 2875 2876
  taosMemoryFreeClear(pSupInfo->plist);
  taosMemoryFree(pSupInfo->colIds);

  taosArrayDestroy(pSupInfo->pColAgg);
L
Liu Jicong 已提交
2877
  for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
2878 2879 2880 2881 2882 2883 2884 2885 2886
    if (pSupInfo->buildBuf[i] != NULL) {
      taosMemoryFreeClear(pSupInfo->buildBuf[i]);
    }
  }
  taosMemoryFree(pSupInfo->buildBuf);

  cleanupFilesetIterator(&pReader->status.fileIter);
  cleanupDataBlockIterator(&pReader->status.blockIter);
  destroyBlockScanInfo(pReader->status.pTableMap);
2887
  blockDataDestroy(pReader->pResBlock);
2888

H
Haojun Liao 已提交
2889 2890 2891
  if (pReader->pFileReader != NULL) {
    tsdbDataFReaderClose(&pReader->pFileReader);
  }
H
refact  
Hongze Cheng 已提交
2892

H
Haojun Liao 已提交
2893 2894 2895 2896
#if 0
//   if (pReader->status.pTableScanInfo != NULL) {
//     pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
//   }
H
refact  
Hongze Cheng 已提交
2897

H
Haojun Liao 已提交
2898
//   tsdbDestroyReadH(&pReader->rhelper);
H
refact  
Hongze Cheng 已提交
2899

H
Haojun Liao 已提交
2900 2901 2902 2903 2904 2905
//   tdFreeDataCols(pReader->pDataCols);
//   pReader->pDataCols = NULL;
//
//   pReader->prev = doFreeColumnInfoData(pReader->prev);
//   pReader->next = doFreeColumnInfoData(pReader->next);
#endif
H
refact  
Hongze Cheng 已提交
2906

2907
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
2908

2909 2910
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
2911
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaLoadTime, pCost->blockLoadTime,
2912
            pCost->checkForNextTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
2913

2914 2915 2916
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
2917 2918 2919
}

bool tsdbNextDataBlock(STsdbReader* pReader) {
2920
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2921 2922
    return false;
  }
H
Hongze Cheng 已提交
2923

H
Haojun Liao 已提交
2924
  // cleanup the data that belongs to the previous data block
2925 2926
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
2927

2928 2929
  int64_t        stime = taosGetTimestampUs();
  int64_t        elapsedTime = stime;
2930
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
2931 2932

  if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
2933
    if (pStatus->loadFromFile) {
2934
      int32_t code = buildBlockFromFiles(pReader);
2935 2936 2937 2938
      if (code != TSDB_CODE_SUCCESS) {
        return false;
      }

2939
      if (pBlock->info.rows > 0) {
2940
        return true;
2941
      } else {
H
Haojun Liao 已提交
2942
        buildBlockFromBufferSequentially(pReader);
2943
        return pBlock->info.rows > 0;
2944
      }
2945
    } else {  // no data in files, let's try the buffer
H
Haojun Liao 已提交
2946
      buildBlockFromBufferSequentially(pReader);
2947
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2948 2949 2950
    }
  } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
  } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) {
2951 2952
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2953
  }
2954
  return false;
H
refact  
Hongze Cheng 已提交
2955 2956 2957
}

void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
2958 2959 2960 2961
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
2962 2963
}

2964
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
2965
  int32_t code = 0;
2966
  *allHave = false;
H
Hongze Cheng 已提交
2967

2968
  // there is no statistics data for composed block
2969 2970 2971 2972
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2973

2974 2975 2976
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
  SBlock*              pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
H
Hongze Cheng 已提交
2977

2978
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
2979

2980 2981
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

2982
  if (tBlockHasSma(pBlock)) {
2983
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
2984
    if (code != TSDB_CODE_SUCCESS) {
2985 2986
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
2987 2988
      return code;
    }
2989 2990 2991
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
2992
  }
H
Hongze Cheng 已提交
2993

2994
  *allHave = true;
H
Hongze Cheng 已提交
2995

2996 2997
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
2998

2999 3000
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
3017 3018
      i += 1;
      j += 1;
3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

  int64_t elapsed = taosGetTimestampUs() - stime;
  pReader->cost.smaLoadTime += elapsed;

  *pBlockStatis = pSup->plist;

  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
3032 3033
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
3034
  return code;
H
Hongze Cheng 已提交
3035 3036
}

H
Hongze Cheng 已提交
3037
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
H
Haojun Liao 已提交
3038 3039 3040
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
3041
    return pReader->pResBlock->pDataBlock;
3042
  }
3043

3044
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3045
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
3046

3047 3048 3049 3050 3051
  int32_t code = tBlockDataInit(&pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
3052

3053 3054
  code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
3055
    tBlockDataClear(&pStatus->fileBlockData, 1);
H
Haojun Liao 已提交
3056

3057 3058
    terrno = code;
    return NULL;
3059
  }
3060 3061

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
H
Hongze Cheng 已提交
3062
  tBlockDataClear(&pStatus->fileBlockData, 1);
3063
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
3064 3065
}

H
Haojun Liao 已提交
3066
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
3067 3068 3069
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
3070

L
Liu Jicong 已提交
3071 3072
  pReader->order = pCond->order;
  pReader->type = BLOCK_LOAD_OFFSET_ORDER;
3073
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3074
  pReader->status.pTableIter = NULL;
H
Hongze Cheng 已提交
3075

H
Haojun Liao 已提交
3076
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
3077

3078
  // allocate buffer in order to load data blocks from file
3079
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
3080 3081
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

3082
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
3083
  tsdbDataFReaderClose(&pReader->pFileReader);
3084

3085 3086 3087 3088
  // todo set the correct numOfTables
  int32_t         numOfTables = 1;
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

L
Liu Jicong 已提交
3089 3090
  tsdbDataFReaderClose(&pReader->pFileReader);

H
Hongze Cheng 已提交
3091
  STsdbFSState* pFState = pReader->pTsdb->pFS->cState;
3092 3093
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3094
  resetDataBlockScanInfo(pReader->status.pTableMap);
3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105

  int32_t code = 0;
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
H
Hongze Cheng 已提交
3106

dengyihao's avatar
dengyihao 已提交
3107 3108
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3109
  return code;
H
Hongze Cheng 已提交
3110
}
H
Hongze Cheng 已提交
3111

3112 3113 3114
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3115

3116 3117 3118 3119
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3120

3121 3122
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3123

3124 3125 3126
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3127

3128
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3129

3130
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3131

3132 3133
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3134

3135 3136
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
H
Haojun Liao 已提交
3137 3138 3139 3140

  if (pBlockIter->numOfBlocks > 0) {
    pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
  }
H
Hongze Cheng 已提交
3141

3142
  pTableBlockInfo->numOfTables = numOfTables;
H
Haojun Liao 已提交
3143
  bool hasNext = (pBlockIter->numOfBlocks > 0);
H
Hongze Cheng 已提交
3144

3145 3146 3147 3148 3149
  while (true) {
    if (hasNext) {
      SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
      STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
      SBlock*              pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
H
Hongze Cheng 已提交
3150

3151 3152
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3153

3154 3155 3156
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3157

3158 3159 3160
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3161

3162 3163 3164
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3165

3166 3167
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3168 3169 3170

      hasNext = blockIteratorNext(&pStatus->blockIter);

3171 3172 3173 3174 3175
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3176

3177
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
3178
      hasNext = (pBlockIter->numOfBlocks > 0);
3179
    }
H
refact  
Hongze Cheng 已提交
3180

H
Hongze Cheng 已提交
3181 3182
    //    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
    //              pReader->pFileGroup->fid, pReader->idStr);
3183
  }
H
Hongze Cheng 已提交
3184

H
refact  
Hongze Cheng 已提交
3185 3186
  return code;
}
H
Hongze Cheng 已提交
3187

H
refact  
Hongze Cheng 已提交
3188
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3189
  int64_t rows = 0;
H
Hongze Cheng 已提交
3190

3191 3192
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3193

3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3216

H
refact  
Hongze Cheng 已提交
3217
  return rows;
H
Hongze Cheng 已提交
3218
}
D
dapan1121 已提交
3219

L
Liu Jicong 已提交
3220
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
D
dapan1121 已提交
3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
L
Liu Jicong 已提交
3233

D
dapan1121 已提交
3234
  if (mr.me.type == TSDB_CHILD_TABLE) {
D
dapan1121 已提交
3235
    tDecoderClear(&mr.coder);
D
dapan1121 已提交
3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
L
Liu Jicong 已提交
3251

D
dapan1121 已提交
3252 3253
  return TSDB_CODE_SUCCESS;
}