tsdbRead.c 106.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
18

19
typedef struct {
dengyihao's avatar
dengyihao 已提交
20
  STbDataIter* iter;
21 22 23 24
  int32_t      index;
  bool         hasVal;
} SIterInfo;

H
Haojun Liao 已提交
25
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32 33 34
  uint64_t  uid;
  TSKEY     lastKey;
  SBlockIdx blockIdx;
  SArray*   pBlockList;  // block data index list
  SIterInfo iter;        // mem buffer skip list iterator
  SIterInfo iiter;       // imem buffer skip list iterator
  SArray*   delSkyline;  // delete info for this table
  int32_t   fileDelIndex;
  bool      iterInit;  // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
35 36 37
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
38 39
  int64_t uid;
  SBlock* pBlock;
H
Haojun Liao 已提交
40
} SBlockOrderWrapper;
H
Hongze Cheng 已提交
41 42

typedef struct SBlockOrderSupporter {
43 44 45 46
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
H
Hongze Cheng 已提交
47 48 49
} SBlockOrderSupporter;

typedef struct SIOCostSummary {
H
Haojun Liao 已提交
50
  int64_t blockLoadTime;
51
  int64_t smaLoadTime;
H
Haojun Liao 已提交
52
  int64_t checkForNextTime;
53 54
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Hongze Cheng 已提交
55 56 57
} SIOCostSummary;

typedef struct SBlockLoadSuppInfo {
58
  SArray*          pColAgg;
59
  SColumnDataAgg   tsColAgg;
C
Cary Xu 已提交
60
  SColumnDataAgg** plist;
61 62 63
  int16_t*         colIds;    // column ids for loading file block data
  int32_t*         slotIds;   // colId to slotId
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
H
Hongze Cheng 已提交
64 65
} SBlockLoadSuppInfo;

66
typedef struct SFilesetIter {
67 68 69 70
  int32_t          numOfFiles;  // number of total files
  int32_t          index;       // current accessed index in the list
  SArray*          pFileList;   // data file list
  int32_t          order;
71
} SFilesetIter;
H
Haojun Liao 已提交
72 73

typedef struct SFileDataBlockInfo {
dengyihao's avatar
dengyihao 已提交
74 75 76
  int32_t
           tbBlockIdx;  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
  uint64_t uid;
H
Haojun Liao 已提交
77 78 79
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
dengyihao's avatar
dengyihao 已提交
80 81 82 83
  int32_t numOfBlocks;
  int32_t index;
  SArray* blockList;  // SArray<SFileDataBlockInfo>
  int32_t order;
H
Haojun Liao 已提交
84 85 86
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
87 88 89 90
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
91 92
} SFileBlockDumpInfo;

H
Haojun Liao 已提交
93
typedef struct SVersionRange {
dengyihao's avatar
dengyihao 已提交
94 95
  uint64_t minVer;
  uint64_t maxVer;
H
Haojun Liao 已提交
96 97
} SVersionRange;

H
Haojun Liao 已提交
98
typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
99 100
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
101
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
102
  SFileBlockDumpInfo   fBlockDumpInfo;
103

dengyihao's avatar
dengyihao 已提交
104 105 106 107 108
  SDFileSet*     pCurrentFileset;  // current opened file set
  SBlockData     fileBlockData;
  SFilesetIter   fileIter;
  SDataBlockIter blockIter;
  bool           composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
109 110
} SReaderStatus;

H
Hongze Cheng 已提交
111
struct STsdbReader {
H
Haojun Liao 已提交
112 113 114 115 116 117 118
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
119 120
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
121
  SBlockLoadSuppInfo suppInfo;
122

H
Hongze Cheng 已提交
123 124
  SIOCostSummary     cost;
  STSchema*          pSchema;
125 126
  SDataFReader*      pFileReader;
  SVersionRange      verRange;
H
Hongze Cheng 已提交
127
};
H
Hongze Cheng 已提交
128

H
Haojun Liao 已提交
129
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
130 131
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
132
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
133 134
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
dengyihao's avatar
dengyihao 已提交
135 136
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                                 STsdbReader* pReader);
137 138 139
static int32_t  doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void     updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
140
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
141

dengyihao's avatar
dengyihao 已提交
142 143
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                             STsdbReader* pReader);
144 145
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
146 147 148 149
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
150
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Haojun Liao 已提交
151

152 153 154
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

155
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
156

157
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
158
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
159 160 161
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
162 163
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
164

H
Haojun Liao 已提交
165 166
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
167
    pSupInfo->colIds[i] = pCol->info.colId;
168 169 170 171

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
H
Haojun Liao 已提交
172
  }
H
Hongze Cheng 已提交
173

H
Haojun Liao 已提交
174 175
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
176

177
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
178
  // allocate buffer in order to load data blocks from file
179 180 181 182
  // todo use simple hash instead
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
183 184 185 186
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
187 188 189 190 191
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
192 193
      }

194
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
195
    } else {
196
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
197
    }
wmmhello's avatar
wmmhello 已提交
198

199 200 201
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
202 203
  }

204
  return pTableMap;
H
Hongze Cheng 已提交
205
}
H
Hongze Cheng 已提交
206

207 208 209
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;

dengyihao's avatar
dengyihao 已提交
210
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
211 212
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
213
    if (p->iter.iter != NULL) {
214 215 216 217 218 219 220
      tsdbTbDataIterDestroy(p->iter.iter);
    }

    taosArrayDestroy(p->delSkyline);
  }
}

221
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
222 223
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
H
Haojun Liao 已提交
224
}
H
Hongze Cheng 已提交
225

226 227 228
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
229
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
H
Hongze Cheng 已提交
230

231
  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
232
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
233

dengyihao's avatar
dengyihao 已提交
234
  STimeWindow win = *pWindow;
235 236 237 238 239 240
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
  }

  return win;
}
H
Hongze Cheng 已提交
241

H
Haojun Liao 已提交
242
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
243 244 245 246 247 248
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
249 250 251
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
H
Haojun Liao 已提交
252 253 254 255
  }
}

// init file iterator
256
static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFState, int32_t order, const char* idstr) {
257 258
  size_t numOfFileset = taosArrayGetSize(pFState->aDFileSet);

259 260 261
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
  pIter->pFileList = taosArrayDup(pFState->aDFileSet);
262
  pIter->numOfFiles = numOfFileset;
H
Haojun Liao 已提交
263

H
Haojun Liao 已提交
264
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
265 266 267
  return TSDB_CODE_SUCCESS;
}

268
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
269 270
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
271 272 273
  pIter->index += step;

  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
274 275 276 277 278
    return false;
  }

  // check file the time range of coverage
  STimeWindow win = {0};
H
Hongze Cheng 已提交
279

280
  while (1) {
281
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
282

283 284 285 286
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
    }
H
Haojun Liao 已提交
287

288 289 290 291 292 293 294 295 296 297 298 299 300 301
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);

    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
    }

    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
      continue;
    }
C
Cary Xu 已提交
302

303 304
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, pReader->window.skey,
              pReader->window.ekey, pReader->idStr);
305 306
    return true;
  }
307

308
_err:
H
Haojun Liao 已提交
309 310 311
  return false;
}

312
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
313 314
  pIter->order = order;
  pIter->index = -1;
H
Haojun Liao 已提交
315
  pIter->numOfBlocks = -1;
316
  pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
H
Haojun Liao 已提交
317 318
}

H
Haojun Liao 已提交
319
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
320 321
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
H
Haojun Liao 已提交
322 323
}

324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    taosMemoryFree(pResBlock);
    return NULL;
  }

  return pResBlock;
}

H
Haojun Liao 已提交
347 348
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
  int32_t      code = 0;
349
  int8_t       level = 0;
H
Haojun Liao 已提交
350
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
351 352
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
353
    goto _end;
H
Hongze Cheng 已提交
354 355
  }

H
Haojun Liao 已提交
356
  initReaderStatus(&pReader->status);
357

dengyihao's avatar
dengyihao 已提交
358
  pReader->pTsdb =
H
Haojun Liao 已提交
359
      getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
dengyihao's avatar
dengyihao 已提交
360 361 362 363 364
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
  pReader->capacity = 4096;
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
365
  pReader->type = pCond->type;
H
Haojun Liao 已提交
366
  pReader->window = updateQueryTimeWindow(pVnode->pTsdb, &pCond->twindows);
367

368
  ASSERT(pCond->numOfCols > 0);
H
Hongze Cheng 已提交
369

370
  limitOutputBufferSize(pCond, &pReader->capacity);
371

372 373
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
374
  pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
375
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
376
  if (pSup->pColAgg == NULL || pSup->plist == NULL) {
377 378 379
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }
H
Haojun Liao 已提交
380

381 382
  pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

383 384 385 386
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Hongze Cheng 已提交
387
  }
H
Hongze Cheng 已提交
388

389 390
  setColumnIdSlotList(pReader, pReader->pResBlock);

H
Hongze Cheng 已提交
391 392
  *ppReader = pReader;
  return code;
H
Hongze Cheng 已提交
393

H
Haojun Liao 已提交
394 395
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
396 397 398
  *ppReader = NULL;
  return code;
}
H
Hongze Cheng 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431

// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
432
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

453 454
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT
// maxVer) {
H
Hongze Cheng 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
//   TSDBROW row = {0};
//   STSRow *rmem = NULL, *rimem = NULL;

//   if (pCheckInfo->iter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
//       rmem = row.pTSRow;
//     }
//   }

//   if (pCheckInfo->iiter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
//       rimem = row.pTSRow;
//     }
//   }

//   if (rmem == NULL && rimem == NULL) {
//     return TSKEY_INITIAL_VAL;
//   }

//   if (rmem != NULL && rimem == NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return TD_ROW_KEY(rmem);
//   }

//   if (rmem == NULL && rimem != NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return TD_ROW_KEY(rimem);
//   }

//   TSKEY r1 = TD_ROW_KEY(rmem);
//   TSKEY r2 = TD_ROW_KEY(rimem);

//   if (r1 == r2) {
//     if (TD_SUPPORT_UPDATE(update)) {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
//     } else {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//       tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     return r1;
//   } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return r1;
//   } else {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return r2;
//   }
// }

H
Haojun Liao 已提交
504
// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) {
H
Hongze Cheng 已提交
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
//   bool hasNext = false;
//   if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iiter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
//     }
//   } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
//     }
//   } else {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
//     }
//   }
H
Hongze Cheng 已提交
538

H
Hongze Cheng 已提交
539 540
//   return hasNext;
// }
H
Hongze Cheng 已提交
541

H
Hongze Cheng 已提交
542 543 544
// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
545

H
Hongze Cheng 已提交
546
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
547

H
Hongze Cheng 已提交
548 549 550
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
551

H
Hongze Cheng 已提交
552
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
553

H
Hongze Cheng 已提交
554 555 556 557 558 559 560 561 562 563 564
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
565

H
Hongze Cheng 已提交
566 567
//   return midSlot;
// }
H
Hongze Cheng 已提交
568

H
Haojun Liao 已提交
569
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
570
  SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
571

572
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
H
Haojun Liao 已提交
573
  if (code != TSDB_CODE_SUCCESS) {
574
    goto _end;
H
Haojun Liao 已提交
575
  }
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577 578
  if (taosArrayGetSize(aBlockIdx) == 0) {
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
579 580
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
581

582
  SBlockIdx* pBlockIdx;
H
Hongze Cheng 已提交
583
  for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
584
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
585

586
    // uid check
H
Hongze Cheng 已提交
587
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
588 589 590 591
      continue;
    }

    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
592
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
593 594 595 596
    if (p == NULL) {
      continue;
    }

597 598
    // todo: not valid info in bockIndex
    // time range check
599 600 601
    //    if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) {
    //      continue;
    //    }
602 603

    // version check
604 605 606
    //    if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) {
    //      continue;
    //    }
H
Haojun Liao 已提交
607 608 609 610 611 612

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
      pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
    }

H
Hongze Cheng 已提交
613 614
    pScanInfo->blockIdx = *pBlockIdx;
    taosArrayPush(pIndexList, pBlockIdx);
H
Haojun Liao 已提交
615
  }
H
Hongze Cheng 已提交
616

617
_end:
H
Hongze Cheng 已提交
618
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
619 620
  return code;
}
H
Hongze Cheng 已提交
621

622 623
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables,
                               int32_t* numOfBlocks) {
H
Haojun Liao 已提交
624
  size_t numOfTables = taosArrayGetSize(pIndexList);
H
Hongze Cheng 已提交
625

H
Haojun Liao 已提交
626
  *numOfValidTables = 0;
H
Hongze Cheng 已提交
627

628
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
629
  while (1) {
630 631 632 633 634 635 636 637
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }

    taosArrayClear(px->pBlockList);
  }

dengyihao's avatar
dengyihao 已提交
638
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
639
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Hongze Cheng 已提交
640

H
Hongze Cheng 已提交
641
    SMapData mapData = {0};
H
Haojun Liao 已提交
642 643
    tMapDataReset(&mapData);
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
H
Hongze Cheng 已提交
644

H
Haojun Liao 已提交
645 646 647
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
    for (int32_t j = 0; j < mapData.nItem; ++j) {
      SBlock block = {0};
H
Hongze Cheng 已提交
648

H
Hongze Cheng 已提交
649
      tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
H
Hongze Cheng 已提交
650

651
      // 1. time range check
652
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
653 654
        continue;
      }
H
Hongze Cheng 已提交
655

656
      // 2. version range check
657 658 659
      if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
        continue;
      }
660

H
Haojun Liao 已提交
661 662 663 664
      void* p = taosArrayPush(pScanInfo->pBlockList, &block);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
665 666

      (*numOfBlocks) += 1;
H
Haojun Liao 已提交
667
    }
H
Hongze Cheng 已提交
668

H
Haojun Liao 已提交
669 670 671 672
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
      (*numOfValidTables) += 1;
    }
  }
H
Hongze Cheng 已提交
673

H
Haojun Liao 已提交
674 675
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
676

677 678
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
679
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
680

681
  pDumpInfo->allDumped = true;
682
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
683 684
}

685 686
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
687
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
688
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
689 690 691 692 693 694 695
      colDataAppendNULL(pColInfoData, rowIndex);
    } else {
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
    }
  } else {
696
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
H
Haojun Liao 已提交
697
  }
H
Haojun Liao 已提交
698 699
}

700
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
701
  SReaderStatus*  pStatus = &pReader->status;
702
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Hongze Cheng 已提交
703

704
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
705
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Haojun Liao 已提交
706 707
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
708
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
709

H
Haojun Liao 已提交
710
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
711
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
712

713
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
714

H
Haojun Liao 已提交
715
  SColVal cv = {0};
716 717
  int32_t colIndex = 0;

718 719
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
720

721
  int32_t rowIndex = 0;
722 723
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

724 725 726 727 728 729 730 731
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
  }

732
  int32_t          i = 0;
733 734
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
735
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
736 737 738 739 740
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
    }
    i += 1;
  }

H
Hongze Cheng 已提交
741
  while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
742 743 744
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
745
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
746 747

    if (pData->cid == pColData->info.colId) {
748
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
749 750
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
751
      }
752 753 754
      colIndex += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
755
    }
756 757 758 759 760

    ASSERT(rowIndex == remain);
    i += 1;
  }

761
  while (i < numOfCols) {
762 763 764
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
H
Haojun Liao 已提交
765
  }
H
Haojun Liao 已提交
766

767
  pResBlock->info.rows = remain;
768
  pDumpInfo->rowIndex += step * remain;
769 770

  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
H
Haojun Liao 已提交
771

H
Haojun Liao 已提交
772 773
  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;
H
Haojun Liao 已提交
774

775
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
776
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
777
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
778 779 780 781 782 783 784
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

  return TSDB_CODE_SUCCESS;
}

// todo consider the output buffer size
785 786
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter,
                                   STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
787 788 789 790 791 792 793 794 795 796 797
  int64_t st = taosGetTimestampUs();

  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);

  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  uint8_t *pb = NULL, *pb1 = NULL;
798
  int32_t  code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
dengyihao's avatar
dengyihao 已提交
799
                                 pBlockData, &pb, &pb1);
800 801 802 803 804 805 806 807 808
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;

  pDumpInfo->allDumped = false;
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
809
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
810
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
H
Haojun Liao 已提交
811 812
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
813 814

_error:
H
Haojun Liao 已提交
815 816 817 818 819
  tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, %s",
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pReader->idStr);
  return code;
H
Haojun Liao 已提交
820
}
H
Hongze Cheng 已提交
821

H
Hongze Cheng 已提交
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
//   int    firstPos, lastPos, midPos = -1;
//   int    numOfRows;
//   TSKEY* keyList;

//   assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

//   if (num <= 0) return -1;

//   keyList = (TSKEY*)pValue;
//   firstPos = 0;
//   lastPos = num - 1;

//   if (order == TSDB_ORDER_DESC) {
//     // find the first position which is smaller than the key
//     while (1) {
//       if (key >= keyList[lastPos]) return lastPos;
//       if (key == keyList[firstPos]) return firstPos;
//       if (key < keyList[firstPos]) return firstPos - 1;

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }

//   } else {
//     // find the first position which is bigger than the key
//     while (1) {
//       if (key <= keyList[firstPos]) return firstPos;
//       if (key == keyList[lastPos]) return lastPos;

//       if (key > keyList[lastPos]) {
//         lastPos = lastPos + 1;
//         if (lastPos >= num)
//           return -1;
//         else
//           return lastPos;
//       }

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }
//   }
H
Hongze Cheng 已提交
880

H
Hongze Cheng 已提交
881 882
//   return midPos;
// }
H
Hongze Cheng 已提交
883

H
Hongze Cheng 已提交
884 885
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
886

H
Hongze Cheng 已提交
887 888 889 890 891 892
//   if (cur->rows > 0) {
//     if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
//       assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
//     } else {
//       assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
//     }
H
Hongze Cheng 已提交
893

H
Hongze Cheng 已提交
894 895 896 897 898
//     SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
//     assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
//            cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
//   } else {
//     cur->win = pTsdbReadHandle->window;
H
Hongze Cheng 已提交
899

H
Hongze Cheng 已提交
900 901 902 903
//     int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
//     cur->lastKey = pTsdbReadHandle->window.ekey + step;
//   }
// }
H
Hongze Cheng 已提交
904

H
Haojun Liao 已提交
905
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo,
H
Hongze Cheng 已提交
906 907
//                                            SDataBlockInfo* pBlockInfo, int32_t endPos) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
908

H
Hongze Cheng 已提交
909 910
//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   TSKEY*     tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
911

H
Hongze Cheng 已提交
912
//   bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Hongze Cheng 已提交
913

H
Hongze Cheng 已提交
914
//   int32_t step = ascScan ? 1 : -1;
H
Hongze Cheng 已提交
915

H
Hongze Cheng 已提交
916 917
//   int32_t start = cur->pos;
//   int32_t end = endPos;
H
Hongze Cheng 已提交
918

H
Hongze Cheng 已提交
919 920 921
//   if (!ascScan) {
//     TSWAP(start, end);
//   }
H
Hongze Cheng 已提交
922

H
Hongze Cheng 已提交
923 924
//   assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
//   int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Hongze Cheng 已提交
925

H
Hongze Cheng 已提交
926 927 928 929 930
//   // the time window should always be ascending order: skey <= ekey
//   cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
//   cur->mixBlock = (numOfRows != pBlockInfo->rows);
//   cur->lastKey = tsArray[endPos] + step;
//   cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Hongze Cheng 已提交
931

H
Hongze Cheng 已提交
932 933 934 935
//   // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
//   int32_t pos = endPos + step;
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
936

H
Hongze Cheng 已提交
937 938 939 940
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
941

H
Hongze Cheng 已提交
942 943
// // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// // be included in the query time window will be discarded
H
Haojun Liao 已提交
944
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) {
H
Hongze Cheng 已提交
945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
//   SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
//   STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);

//   initTableMemIterator(pTsdbReadHandle, pCheckInfo);

//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
//          cur->pos >= 0 && cur->pos < pBlock->numOfRows);
//   // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData
//   interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] ==
//   pBlock->minKey.ts &&
//          tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);

//   bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
//   int32_t step = ascScan ? 1 : -1;

//   // for search the endPos, so the order needs to reverse
//   int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;

//   int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
//   int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);

//   STimeWindow* pWin = &blockInfo.window;
//   tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
//             " rows:%d, start:%d, end:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
//             pTsdbReadHandle->idStr);

//   // compared with the data from in-memory buffer, to generate the correct timestamp array list
//   int32_t numOfRows = 0;
//   int32_t curRow = 0;

//   int16_t   rv1 = -1;
//   int16_t   rv2 = -1;
//   STSchema* pSchema1 = NULL;
//   STSchema* pSchema2 = NULL;

//   int32_t pos = cur->pos;
//   cur->win = TSWINDOW_INITIALIZER;
//   bool adjustPos = false;

//   // no data in buffer, load data from file directly
//   if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
//     copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
//     return;
//   } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
//     SSkipListNode* node = NULL;
//     TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;

//     do {
//       STSRow* row2 = NULL;
//       STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
//       if (row1 == NULL) {
//         break;
//       }

//       TSKEY key = TD_ROW_KEY(row1);
//       if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
//         break;
//       }

//       if (adjustPos) {
//         if (key == lastKeyAppend) {
//           pos -= step;
//         }
//         adjustPos = false;
//       }

//       if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
//           ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         break;
//       }

//       if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
//         if (rv1 != TD_ROW_SVER(row1)) {
//           //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
//           rv1 = TD_ROW_SVER(row1);
//         }
//         if (row2 && rv2 != TD_ROW_SVER(row2)) {
//           //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
//           rv2 = TD_ROW_SVER(row2);
//         }

//         numOfRows +=
//             mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = key;
//         }

//         cur->win.ekey = key;
//         cur->lastKey = key + step;
//         cur->mixBlock = true;
//         moveToNextRowInMem(pCheckInfo);
//       } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
//         if (TD_SUPPORT_UPDATE(pCfg->update)) {
//           if (lastKeyAppend != key) {
//             if (lastKeyAppend != TSKEY_INITIAL_VAL) {
//               ++curRow;
//             }
//             lastKeyAppend = key;
//           }
//           // load data from file firstly
//           numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);

//           if (rv1 != TD_ROW_SVER(row1)) {
//             rv1 = TD_ROW_SVER(row1);
//           }
//           if (row2 && rv2 != TD_ROW_SVER(row2)) {
//             rv2 = TD_ROW_SVER(row2);
//           }

//           // still assign data into current row
//           numOfRows +=
//               mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                  pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);

//           if (cur->win.skey == TSKEY_INITIAL_VAL) {
//             cur->win.skey = key;
//           }

//           cur->win.ekey = key;
//           cur->lastKey = key + step;
//           cur->mixBlock = true;

//           moveToNextRowInMem(pCheckInfo);

//           pos += step;
//           adjustPos = true;
//         } else {
//           // discard the memory record
//           moveToNextRowInMem(pCheckInfo);
//         }
//       } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
//         assert(end != -1);

//         if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
// #if 0
//           if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
// #endif
//           if (!TD_SUPPORT_UPDATE(pCfg->update)) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
//         }

//         int32_t qstart = 0, qend = 0;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);

//         if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
//           ++curRow;
//         }

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
//         pos += (qend - qstart + 1) * step;
//         if (numOfRows > 0) {
//           curRow = numOfRows - 1;
//         }

//         cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
//         cur->lastKey = cur->win.ekey + step;
//         lastKeyAppend = cur->win.ekey;
//       }
//     } while (numOfRows < pTsdbReadHandle->outputCapacity);

//     if (numOfRows < pTsdbReadHandle->outputCapacity) {
//       /**
//        * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
//        * copy them all to result buffer, since it may be overlapped with file data block.
//        */
//       if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan)
//       ||
//           ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         // no data in cache or data in cache is greater than the ekey of time window, load data from file block
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t start = -1, end = -1;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
//         pos += (end - start + 1) * step;

//         cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
//         cur->lastKey = cur->win.ekey + step;
//         cur->mixBlock = true;
//       }
//     }
//   }
H
Hongze Cheng 已提交
1146

H
Hongze Cheng 已提交
1147 1148
//   cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
//                          ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
H
Hongze Cheng 已提交
1149

H
Hongze Cheng 已提交
1150 1151 1152
//   if (!ascScan) {
//     TSWAP(cur->win.skey, cur->win.ekey);
//   }
H
Hongze Cheng 已提交
1153

H
Hongze Cheng 已提交
1154 1155
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
1156

H
Hongze Cheng 已提交
1157 1158 1159 1160
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
1161

H
Haojun Liao 已提交
1162 1163 1164
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1165

H
Haojun Liao 已提交
1166 1167 1168 1169
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
  }
H
Hongze Cheng 已提交
1170

H
Haojun Liao 已提交
1171 1172
  taosMemoryFreeClear(pSup->pDataBlockInfo);
}
H
Hongze Cheng 已提交
1173

H
Haojun Liao 已提交
1174 1175
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Hongze Cheng 已提交
1176

H
Haojun Liao 已提交
1177
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1178 1179
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Hongze Cheng 已提交
1180

H
Haojun Liao 已提交
1181 1182 1183 1184
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Hongze Cheng 已提交
1185

H
Haojun Liao 已提交
1186 1187
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
1188

H
Haojun Liao 已提交
1189
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1190
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1191
  int32_t rightIndex = *(int32_t*)pRight;
H
Hongze Cheng 已提交
1192

H
Haojun Liao 已提交
1193
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Hongze Cheng 已提交
1194

H
Haojun Liao 已提交
1195 1196
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
H
Hongze Cheng 已提交
1197

H
Haojun Liao 已提交
1198 1199 1200 1201 1202 1203 1204
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
  }
H
Hongze Cheng 已提交
1205

1206
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1207
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Hongze Cheng 已提交
1208

H
Haojun Liao 已提交
1209 1210
  return pLeftBlock->pBlock->aSubBlock[0].offset > pRightBlock->pBlock->aSubBlock[0].offset ? 1 : -1;
}
H
Hongze Cheng 已提交
1211

H
Haojun Liao 已提交
1212
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1213
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1214

1215
  pBlockIter->numOfBlocks = numOfBlocks;
1216 1217
  taosArrayClear(pBlockIter->blockList);

1218 1219
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
H
Haojun Liao 已提交
1220

1221
  SBlockOrderSupporter sup = {0};
H
Haojun Liao 已提交
1222

1223 1224 1225 1226
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1227

1228 1229 1230 1231 1232 1233 1234
  int32_t cnt = 0;
  void*   ptr = NULL;
  while (1) {
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
    }
H
Haojun Liao 已提交
1235

1236 1237 1238 1239
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1240

1241 1242
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
H
Haojun Liao 已提交
1243

1244 1245 1246 1247 1248
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1249

1250 1251 1252 1253 1254
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
      wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
      wrapper.uid = pTableScanInfo->uid;
H
Haojun Liao 已提交
1255

1256 1257 1258 1259 1260 1261
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }

    sup.numOfTables += 1;
  }
H
Haojun Liao 已提交
1262

1263
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1264

1265 1266 1267 1268 1269
  // since there is only one table qualified, blocks are not sorted
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1270
    }
1271 1272
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt,
              pReader->idStr);
1273

1274 1275
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1276
  }
H
Haojun Liao 已提交
1277

1278 1279
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1280

1281
  assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
H
Haojun Liao 已提交
1282

1283 1284 1285 1286 1287
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1288
  }
H
Haojun Liao 已提交
1289

1290 1291 1292 1293
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
H
Haojun Liao 已提交
1294

1295 1296
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
Haojun Liao 已提交
1297

1298 1299 1300 1301
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
H
Haojun Liao 已提交
1302

1303 1304
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1305
  }
H
Haojun Liao 已提交
1306

1307 1308 1309
  tsdbDebug("%p %d data blocks sort completed, %s", pReader, cnt, pReader->idStr);
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
H
Haojun Liao 已提交
1310

1311 1312
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1313
}
H
Hongze Cheng 已提交
1314

H
Haojun Liao 已提交
1315
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1316 1317
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);

1318
  int32_t step = asc ? 1 : -1;
1319
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1320 1321 1322
    return false;
  }

1323
  pBlockIter->index += step;
1324 1325 1326
  return true;
}

1327 1328 1329
/**
 * This is an two rectangles overlap cases.
 */
1330
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1331 1332 1333 1334
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
         (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
         (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
H
Haojun Liao 已提交
1335
}
H
Hongze Cheng 已提交
1336

H
Haojun Liao 已提交
1337 1338 1339 1340
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pFBlockInfo;
}
H
Hongze Cheng 已提交
1341

1342 1343
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1344 1345 1346
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
1347 1348
  }

1349
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1350 1351 1352
    return NULL;
  }

1353
  int32_t step = asc ? 1 : -1;
1354 1355 1356 1357 1358 1359 1360 1361 1362

  *nextIndex = pFBlockInfo->tbBlockIdx + step;
  SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  return pNext;
}

static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);

1363
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1364 1365
  int32_t index = pBlockIter->index;

1366
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
    }

    index += step;
  }

  ASSERT(0);
  return -1;
}

1379
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1380 1381 1382 1383 1384
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
    return -1;
  }

  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1385 1386 1387 1388 1389
  pBlockIter->index += step;

  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1390

1391 1392 1393
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
  }
1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404

  return TSDB_CODE_SUCCESS;
}

static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1405
}
H
Hongze Cheng 已提交
1406

1407
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1408
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Hongze Cheng 已提交
1409

1410
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1411
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1412
}
H
Hongze Cheng 已提交
1413

H
Haojun Liao 已提交
1414
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
1415 1416
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
         (pBlock->minVersion <= pVerRange->maxVer);
H
Haojun Liao 已提交
1417 1418
}

1419
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
  if (pBlockScanInfo->delSkyline == NULL) {
    return false;
  }

  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
  TSDBKEY* pLast  = taosArrayGetLast(pBlockScanInfo->delSkyline);

  // ts is not overlap
  if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
    return false;
  }

1432 1433
  int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;

1434 1435
  // version is not overlap
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
1436
  for(int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
    if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
      if (p->version >= pBlock->minVersion) {
        return true;
      }
    } else if (p->ts > pBlock->maxKey.ts) {
      return false;
    }
  }

  ASSERT(0);
  return false;
}

1451 1452 1453 1454
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1455
// 5. delete info should not overlap with current block data
1456 1457
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY key) {
1458 1459 1460
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);

1461
  // overlap with neighbor
1462 1463 1464 1465 1466
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
  }

1467 1468
  // has duplicated ts of different version in this block
  bool hasDup = (pBlock->nSubBlock == 1)? pBlock->hasDup:true;
1469
  bool overlapWithDel= overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
1470

1471
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
1472
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
H
Haojun Liao 已提交
1473 1474
}

1475
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1476
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1477 1478
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1479

1480 1481 1482
  SSDataBlock* pBlock = pReader->pResBlock;

  int64_t st = taosGetTimestampUs();
1483
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1484

1485
  blockDataUpdateTsWindow(pBlock, 0);
1486
  pBlock->info.uid = pBlockScanInfo->uid;
1487

1488
  setComposedBlockFlag(pReader, true);
1489 1490 1491 1492 1493 1494

  int64_t elapsedTime = taosGetTimestampUs() - st;
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
            " us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
            pBlock->info.window.ekey, pReader->idStr);
H
Haojun Liao 已提交
1495 1496 1497
  return code;
}

1498
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1499
                                     STSRow* pTSRow, SIterInfo* pIter, int64_t key) {
1500 1501 1502 1503 1504
  SRowMerger          merge = {0};
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

  TSDBKEY k = TSDBROW_KEY(pRow);
1505
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1506
  SArray* pDelList = pBlockScanInfo->delSkyline;
1507

1508 1509 1510 1511 1512 1513 1514 1515
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else if (k.ts < key) {  // k.ts < key
1516
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1517 1518 1519
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1520 1521

      tRowMerge(&merge, pRow);
1522
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1523 1524

      tRowMergerGetRow(&merge, &pTSRow);
1525
    }
1526 1527
  } else {  // descending order scan
    if (key < k.ts) {
1528
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1529 1530
    } else if (k.ts < key) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1531

1532 1533 1534 1535 1536 1537
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else {  // descending order: mem rows -----> imem rows ------> file block
      updateSchema(pRow, pBlockScanInfo->uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1538
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1539 1540 1541 1542 1543 1544

      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
    }
1545 1546
  }

1547
  tRowMergerClear(&merge);
1548 1549 1550 1551
  doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
  return TSDB_CODE_SUCCESS;
}

1552 1553 1554 1555
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

1556 1557
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
dengyihao's avatar
dengyihao 已提交
1558
  SArray*             pDelList = pBlockScanInfo->delSkyline;
1559

1560 1561
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1562
  ASSERT(pRow != NULL && piRow != NULL);
H
Haojun Liao 已提交
1563

1564
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
H
Haojun Liao 已提交
1565

1566
  uint64_t uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1567

1568 1569 1570 1571
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

  if (ASCENDING_TRAVERSE(pReader->order)) {
1572 1573
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1574 1575 1576
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);

1577
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
H
Haojun Liao 已提交
1578

1579 1580
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1581
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1582 1583
      }

1584 1585
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1586
        doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1587 1588 1589 1590
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1591
      return TSDB_CODE_SUCCESS;
1592
    } else {  // key > ik.ts || key > k.ts
1593 1594
      ASSERT(key != ik.ts);

1595
      // [3] ik.ts < key <= k.ts
1596
      // [4] ik.ts < k.ts <= key
1597
      if (ik.ts < k.ts) {
1598
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1599 1600 1601 1602
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1603 1604
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1605
      if (k.ts < ik.ts) {
1606
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
1607 1608 1609 1610
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

1611
      // [7] k.ts == ik.ts < key
1612
      if (k.ts == ik.ts) {
1613 1614
        ASSERT(key > ik.ts && key > k.ts);

1615
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
1616 1617 1618 1619
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
1620 1621 1622 1623 1624 1625
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
      updateSchema(pRow, uid, pReader);

      tRowMergerInit(&merge, pRow, pReader->pSchema);
1626
      doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1627 1628 1629

      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1630
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
      }

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      }

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
      return TSDB_CODE_SUCCESS;
    } else {
1643
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1644 1645 1646 1647

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1648
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);

        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }

      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1667
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679

        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
    }
  }

  ASSERT(0);
S
Shengliang Guan 已提交
1680
  return -1;
1681 1682
}

dengyihao's avatar
dengyihao 已提交
1683 1684
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
  }

  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
  }

1696
  TSDBKEY k = {.ts = ts, .version = ver};
1697
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
1698 1699 1700
    return false;
  }

1701 1702 1703
  return true;
}

1704
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
1705

1706 1707 1708 1709 1710 1711 1712 1713
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;

  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;

  int64_t  key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1714 1715
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
1716

1717
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1718
    return doMergeThreeLevelRows(pReader, pBlockScanInfo);
1719
  } else {
1720
    // imem + file
1721 1722
    if (pBlockScanInfo->iiter.hasVal) {
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key);
1723 1724
    }

1725
    // mem + file
1726
    if (pBlockScanInfo->iter.hasVal) {
dengyihao's avatar
dengyihao 已提交
1727
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1728
    }
1729

1730
    // imem & mem are all empty, only file exist
1731
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1732 1733 1734 1735
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
    doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1736

1737
    return TSDB_CODE_SUCCESS;
1738 1739 1740
  }
}

1741
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
1742 1743
  SSDataBlock* pResBlock = pReader->pResBlock;

1744
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
1745 1746
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
1747

1748
  while (1) {
1749 1750
    // todo check the validate of row in file block
    {
1751
      if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
        pDumpInfo->rowIndex += step;

        SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
        SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);

        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
        }

        continue;
      }
    }

1766
    buildComposedDataBlockImpl(pReader, pBlockScanInfo);
1767

1768 1769
    SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
    SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
1770

1771 1772 1773 1774 1775 1776 1777 1778
    // currently loaded file data block is consumed
    if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
1779 1780 1781 1782
    }
  }

  pResBlock->info.uid = pBlockScanInfo->uid;
1783 1784
  blockDataUpdateTsWindow(pResBlock, 0);

1785
  setComposedBlockFlag(pReader, true);
1786

1787 1788
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader,
            pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
1789
            pReader->idStr);
1790

1791 1792 1793 1794 1795
  return TSDB_CODE_SUCCESS;
}

void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }

1796
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1797 1798 1799 1800
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }

1801
  int32_t code = TSDB_CODE_SUCCESS;
1802 1803 1804 1805 1806 1807 1808 1809 1810

  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
1811 1812 1813 1814

  STbData* d = NULL;
  if (pReader->pTsdb->mem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
1815
    if (d != NULL) {
1816
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
1817
      if (code == TSDB_CODE_SUCCESS) {
1818
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
1819

H
Haojun Liao 已提交
1820
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1821 1822
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
1823
      } else {
1824 1825
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1826
        return code;
1827 1828
      }
    }
H
Haojun Liao 已提交
1829
  } else {
1830
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
1831 1832
  }

1833 1834 1835
  STbData* di = NULL;
  if (pReader->pTsdb->imem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
1836
    if (di != NULL) {
1837
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
1838
      if (code == TSDB_CODE_SUCCESS) {
1839
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
1840

H
Haojun Liao 已提交
1841
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
1842
                  "-%" PRId64 " %s",
1843
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
1844
      } else {
1845 1846
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
1847
        return code;
1848 1849
      }
    }
H
Haojun Liao 已提交
1850 1851
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
1852 1853
  }

1854 1855
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);

1856
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
1857 1858 1859
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1860 1861
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
1862 1863 1864
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
  }
1865

1866 1867 1868
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

1869 1870
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));

dengyihao's avatar
dengyihao 已提交
1871
  SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
    if (code) {
      goto _err;
    }

    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
      goto _err;
    }

1884 1885 1886 1887
    code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
    if (code) {
      goto _err;
    }
1888

1889 1890 1891 1892 1893 1894
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);

    code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
1895
    }
1896
  }
1897

1898 1899 1900 1901 1902 1903 1904
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
1905 1906
  }

1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
  }

  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
  }

  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
1921 1922
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
1923 1924
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
1925 1926
  return code;

1927 1928 1929
_err:
  taosArrayDestroy(pDelData);
  return code;
1930 1931
}

1932 1933 1934
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};

1935
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
1936 1937
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

1938 1939
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
1940
  if (pRow != NULL) {
1941 1942 1943
    key = TSDBROW_KEY(pRow);
  }

1944
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
1945
  if (pRow != NULL) {
1946 1947 1948 1949 1950 1951 1952 1953 1954
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
    }
  }

  return key;
}

H
Haojun Liao 已提交
1955 1956 1957 1958
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
  SReaderStatus* pStatus = &pReader->status;

  while (1) {
1959
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
1960
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987
      break;
    }

    SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (taosArrayGetSize(pIndexList) > 0) {
      uint32_t numOfValidTable = 0;
      code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      if (numOfValidTable > 0) {
        break;
      }
    }

    // no blocks in current file, try next files
  }

  return TSDB_CODE_SUCCESS;
}

1988 1989 1990
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;

1991
  SReaderStatus*  pStatus = &pReader->status;
1992 1993
  SDataBlockIter* pBlockIter = &pStatus->blockIter;

1994 1995
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
1996 1997 1998 1999 2000

  SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);

  TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
  if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
2001 2002
    tBlockDataInit(&pStatus->fileBlockData);
    code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
2003 2004 2005 2006 2007
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // build composed data block
2008
    code = buildComposedDataBlock(pReader, pScanInfo);
2009 2010
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2011
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2012
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2013
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2014
  } else {  // whole block is required, return it directly
2015
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
2016 2017 2018
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
2019
    setComposedBlockFlag(pReader, false);
2020
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
2021 2022 2023 2024 2025
  }

  return code;
}

H
Haojun Liao 已提交
2026
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2027 2028
  SReaderStatus* pStatus = &pReader->status;

2029
  while (1) {
2030 2031 2032
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2033
        return TSDB_CODE_SUCCESS;
2034 2035 2036 2037
      }
    }

    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2038
    initMemDataIterator(pBlockScanInfo, pReader);
2039

2040
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2041
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2042 2043 2044 2045
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2046
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2047
      return TSDB_CODE_SUCCESS;
2048 2049 2050 2051 2052
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2053
      return TSDB_CODE_SUCCESS;
2054 2055 2056 2057
    }
  }
}

2058
// set the correct start position in case of the first/last file block, according to the query time window
2059
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2060
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2061
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2062
  SBlock*              pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
2063

2064 2065 2066
  SReaderStatus* pStatus = &pReader->status;

  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2067 2068 2069

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2070
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2071 2072
}

2073
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087
  int32_t numOfBlocks = 0;
  int32_t code = moveToNextFile(pReader, &numOfBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // all data files are consumed, try data in buffer
  if (numOfBlocks == 0) {
    pReader->status.loadFromFile = false;
    return code;
  }

  // initialize the block iterator for a new fileset
  code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
2088 2089

  // set the correct start position according to the query time window
2090
  initBlockDumpInfo(pReader, pBlockIter);
2091 2092 2093
  return code;
}

2094
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2095 2096
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2097 2098
}

2099
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2100
  int32_t code = TSDB_CODE_SUCCESS;
2101 2102
  bool    asc = ASCENDING_TRAVERSE(pReader->order);

2103 2104
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

2105
  while (1) {
2106
    SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
2107 2108
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));

2109 2110
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2111
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126
      code = buildComposedDataBlock(pReader, pScanInfo);
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
        } else {  // data blocks in current file are exhausted, let's try the next file now
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2127
        }
H
Haojun Liao 已提交
2128
      }
2129 2130 2131

      // current block is not loaded yet, or data in buffer may overlap with the file block.
      code = doBuildDataBlock(pReader);
2132 2133
    }

2134 2135 2136 2137 2138 2139 2140 2141
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
    }
  }
2142
}
H
refact  
Hongze Cheng 已提交
2143

2144 2145
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2146
  if (VND_IS_RSMA(pVnode)) {
2147
    int8_t  level = 0;
2148 2149
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);

2150
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
    }

2164 2165
    int32_t     vgId = TD_VID(pVnode);
    const char* str = (idStr != NULL) ? idStr : "";
2166 2167

    if (level == TSDB_RETENTION_L0) {
2168
      *pLevel = TSDB_RETENTION_L0;
2169 2170 2171
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2172
      *pLevel = TSDB_RETENTION_L1;
2173 2174 2175
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
      return VND_RSMA1(pVnode);
    } else {
2176
      *pLevel = TSDB_RETENTION_L2;
2177 2178 2179 2180 2181 2182 2183 2184
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
      return VND_RSMA2(pVnode);
    }
  }

  return VND_TSDB(pVnode);
}

H
Haojun Liao 已提交
2185 2186 2187
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
  int64_t startVer = (pCond->startVersion == -1)? 0:pCond->startVersion;

2188
  if (VND_IS_RSMA(pVnode)) {
H
Haojun Liao 已提交
2189 2190 2191 2192 2193 2194 2195 2196
    return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
  }

  int64_t endVer = 0;
  if (pCond->endVersion == -1) {  // user not specified end version, set current maximum version of vnode as the endVersion
    endVer = pVnode->state.applied;
  } else {
    endVer = (pCond->endVersion > pVnode->state.applied)? pVnode->state.applied:pCond->endVersion;
2197 2198
  }

H
Haojun Liao 已提交
2199
  return (SVersionRange){.minVer = startVer, .maxVer = endVer};
2200 2201
}

H
Hongze Cheng 已提交
2202 2203 2204 2205
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
refact  
Hongze Cheng 已提交
2206

H
Hongze Cheng 已提交
2207 2208
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
refact  
Hongze Cheng 已提交
2209

H
Hongze Cheng 已提交
2210 2211
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
2212
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
refact  
Hongze Cheng 已提交
2213

H
Hongze Cheng 已提交
2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }

//     i++;
//   }

//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }

H
Haojun Liao 已提交
2228
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
2229 2230 2231 2232 2233 2234
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);

//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }

2235
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
2236 2237 2238 2239
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
2240 2241 2242
  size_t num = taosArrayGetSize(pDelList);
  bool asc = ASCENDING_TRAVERSE(order);
  int32_t step = asc? 1:-1;
2243

2244 2245 2246 2247 2248 2249
  if (asc) {
    if (*index >= num - 1) {
      TSDBKEY* last = taosArrayGetLast(pDelList);
      ASSERT(pKey->ts >= last->ts);

      if (pKey->ts > last->ts) {
2250
        return false;
2251 2252 2253
      } else if (pKey->ts == last->ts) {
        TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
        return (prev->version >= pKey->version);
2254 2255
      }
    } else {
2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);

      if (pKey->ts < pCurrent->ts) {
        return false;
      }

      if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
        return true;
      }

      while (pNext->ts <= pKey->ts && (*index) < num - 1) {
        (*index) += 1;

        if ((*index) < num - 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pNext = taosArrayGet(pDelList, (*index) + 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version == 0 && pNext->version > 0) {
            continue;
          }

          if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
            return true;
          }
        }
      }

      return false;
2286 2287
    }
  } else {
2288 2289
    if (*index <= 0) {
      TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
2290

2291 2292 2293 2294 2295 2296 2297
      if (pKey->ts < pFirst->ts) {
        return false;
      } else if (pKey->ts == pFirst->ts) {
        return pFirst->version >= pKey->version;
      } else {
        ASSERT(0);
      }
2298
    } else {
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
      TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
      TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);

      if (pKey->ts > pCurrent->ts) {
        return false;
      }

      if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
        return true;
      }

      while (pPrev->ts >= pKey->ts && (*index) > 1) {
        (*index) += step;

        if ((*index) >= 1) {
          pCurrent = taosArrayGet(pDelList, *index);
          pPrev = taosArrayGet(pDelList, (*index) - 1);

          // it is not a consecutive deletion range, ignore it
          if (pCurrent->version > 0 && pPrev->version == 0) {
            continue;
          }

          if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
            return true;
          }
        }
2326 2327 2328 2329 2330
      }

      return false;
    }
  }
2331 2332

  return false;
2333 2334 2335 2336
}

TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2337 2338
    return NULL;
  }
H
Hongze Cheng 已提交
2339

2340
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2341
  TSDBKEY  key = TSDBROW_KEY(pRow);
2342
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2343
    pIter->hasVal = false;
H
Haojun Liao 已提交
2344 2345
    return NULL;
  }
H
Hongze Cheng 已提交
2346

2347
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2348
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
2349
      (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2350 2351
    return pRow;
  }
H
Hongze Cheng 已提交
2352

2353
  while (1) {
2354 2355
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2356 2357
      return NULL;
    }
H
Hongze Cheng 已提交
2358

2359
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Hongze Cheng 已提交
2360

H
Haojun Liao 已提交
2361
    key = TSDBROW_KEY(pRow);
2362
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2363
      pIter->hasVal = false;
H
Haojun Liao 已提交
2364 2365
      return NULL;
    }
H
Hongze Cheng 已提交
2366

dengyihao's avatar
dengyihao 已提交
2367
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
2368
        (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
H
Haojun Liao 已提交
2369 2370 2371 2372
      return pRow;
    }
  }
}
H
Hongze Cheng 已提交
2373

dengyihao's avatar
dengyihao 已提交
2374
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
H
Haojun Liao 已提交
2375
  while (1) {
2376 2377
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2378 2379
      break;
    }
H
Hongze Cheng 已提交
2380

2381
    // data exists but not valid
2382
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2383 2384 2385 2386 2387
    if (pRow == NULL) {
      break;
    }

    // ts is not identical, quit
H
Haojun Liao 已提交
2388
    TSDBKEY k = TSDBROW_KEY(pRow);
2389
    if (k.ts != ts) {
H
Haojun Liao 已提交
2390 2391 2392 2393 2394 2395 2396 2397 2398
      break;
    }

    tRowMerge(pMerger, pRow);
  }

  return TSDB_CODE_SUCCESS;
}

2399
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2400
                                          SVersionRange* pVerRange, int32_t step) {
2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
      continue;
    }

    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}

typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;

static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2420 2421
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2422
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2423
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2424

2425
  *state = CHECK_FILEBLOCK_QUIT;
2426
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2427 2428 2429

  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2430
  if (pNeighborBlock == NULL) {  // do nothing
2431 2432 2433 2434 2435
    return 0;
  }

  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
  if (overlap) {  // load next block
2436
    SReaderStatus*  pStatus = &pReader->status;
2437 2438
    SDataBlockIter* pBlockIter = &pStatus->blockIter;

2439
    // 1. find the next neighbor block in the scan block list
2440
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2441
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
2442

2443
    // 2. remove it from the scan block list
2444
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2445

2446
    // 3. load the neighbor block, and set it to be the currently accessed file data block
2447 2448 2449 2450 2451
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2452
    // 4. check the data values
2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);

    if (pDumpInfo->rowIndex >= pBlock->nRow) {
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2466 2467
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2468 2469
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2470
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2471
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2472
  int32_t step = asc ? 1 : -1;
2473

2474 2475 2476 2477 2478
  pDumpInfo->rowIndex += step;
  if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2479

2480 2481 2482 2483
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2484

2485 2486 2487 2488 2489
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
      SBlock*             pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
2490
      }
2491
    }
H
Haojun Liao 已提交
2492
  }
2493

H
Haojun Liao 已提交
2494 2495 2496
  return TSDB_CODE_SUCCESS;
}

2497
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
2498 2499 2500 2501 2502 2503 2504 2505 2506 2507
  int32_t sversion = TSDBROW_SVERSION(pRow);

  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
  } else if (pReader->pSchema->version != sversion) {
    taosMemoryFreeClear(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
  }
}

dengyihao's avatar
dengyihao 已提交
2508 2509
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                      STsdbReader* pReader) {
2510 2511 2512
  SRowMerger merge = {0};

  TSDBKEY k = TSDBROW_KEY(pRow);
2513
  updateSchema(pRow, uid, pReader);
H
Haojun Liao 已提交
2514

2515
  tRowMergerInit(&merge, pRow, pReader->pSchema);
2516
  doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
2517 2518 2519
  tRowMergerGetRow(&merge, pTSRow);
}

2520 2521
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2522 2523
  SRowMerger merge = {0};

2524 2525 2526
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);

2527 2528 2529 2530
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
    updateSchema(piRow, pBlockScanInfo->uid, pReader);

    tRowMergerInit(&merge, piRow, pReader->pSchema);
2531
    doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2532

2533
    tRowMerge(&merge, pRow);
2534
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2535 2536
  } else {
    updateSchema(pRow, pBlockScanInfo->uid, pReader);
2537

2538
    tRowMergerInit(&merge, pRow, pReader->pSchema);
2539
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2540 2541

    tRowMerge(&merge, piRow);
2542
    doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2543
  }
2544 2545 2546 2547

  tRowMergerGetRow(&merge, pTSRow);
}

2548 2549
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
                            int64_t endKey) {
2550 2551
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2552
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2553

2554 2555
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2556
  if (pBlockScanInfo->iter.hasVal) {
2557 2558 2559 2560 2561 2562
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
    }
  }

2563
  if (pBlockScanInfo->iiter.hasVal) {
2564 2565 2566 2567 2568 2569
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
    }
  }

2570
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2571
    TSDBKEY k = TSDBROW_KEY(pRow);
2572
    TSDBKEY ik = TSDBROW_KEY(piRow);
H
Haojun Liao 已提交
2573

2574
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2575
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
2576
    } else if (k.ts < ik.ts) {
2577
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
2578 2579
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
H
Haojun Liao 已提交
2580
    }
2581 2582

    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2583 2584
  }

2585 2586
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2587 2588 2589
    return TSDB_CODE_SUCCESS;
  }

2590 2591
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2592 2593 2594 2595 2596 2597
    return TSDB_CODE_SUCCESS;
  }

  return TSDB_CODE_SUCCESS;
}

2598 2599 2600 2601
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);

2602
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2603
  STSchema*           pSchema = pReader->pSchema;
2604

2605
  SColVal colVal = {0};
2606
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2607

2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
  }

  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
      tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2628
    }
2629 2630
  }

2631
  // set null value since current column does not exist in the "pSchema"
2632
  while (i < numOfCols) {
2633 2634 2635 2636 2637
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }

2638 2639 2640 2641
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
}

2642 2643
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2644 2645 2646 2647
  SSDataBlock* pBlock = pReader->pResBlock;

  do {
    STSRow* pTSRow = NULL;
2648
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
2649 2650
    if (pTSRow == NULL) {
      break;
H
Haojun Liao 已提交
2651 2652
    }

2653
    doAppendOneRow(pBlock, pReader, pTSRow);
H
Haojun Liao 已提交
2654 2655

    // no data in buffer, return immediately
2656
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2657 2658 2659
      break;
    }

2660
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2661 2662 2663 2664
      break;
    }
  } while (1);

2665
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2666 2667
  return TSDB_CODE_SUCCESS;
}
H
Hongze Cheng 已提交
2668

2669
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2670
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2671 2672 2673 2674 2675
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);

  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2676 2677 2678
  return TDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2679 2680 2681 2682 2683 2684
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2685

dengyihao's avatar
dengyihao 已提交
2686 2687 2688 2689 2690 2691
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2692

C
Cary Xu 已提交
2693 2694 2695 2696 2697 2698 2699 2700 2701 2702
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
2703
  if (!pCur) {
C
Cary Xu 已提交
2704 2705
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
refact  
Hongze Cheng 已提交
2720
// ====================================== EXPOSED APIs ======================================
2721 2722
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
H
Haojun Liao 已提交
2723
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, idstr);
H
Haojun Liao 已提交
2724 2725 2726
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
2727

2728 2729 2730 2731 2732 2733 2734 2735
  if (pCond->suid != 0) {
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1);
    ASSERT((*ppReader)->pSchema);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

H
Haojun Liao 已提交
2736
  STsdbReader* pReader = *ppReader;
2737
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2738 2739 2740
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2741

2742 2743
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
2744 2745 2746
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
2747

H
Haojun Liao 已提交
2748 2749 2750
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
2751

2752 2753 2754
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  STsdbFSState* pFState = pReader->pTsdb->fs->cState;
2755
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
2756 2757 2758 2759 2760 2761 2762
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);

  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
2763
    if (code != TSDB_CODE_SUCCESS) {
2764 2765 2766 2767
      return code;
    }
  }

2768
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
2769
  return code;
H
Hongze Cheng 已提交
2770 2771

_err:
2772
  tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
2773
  return code;
H
refact  
Hongze Cheng 已提交
2774 2775 2776
}

void tsdbReaderClose(STsdbReader* pReader) {
2777 2778
  if (pReader == NULL) {
    return;
2779
  }
H
refact  
Hongze Cheng 已提交
2780

2781 2782
  blockDataDestroy(pReader->pResBlock);
  taosMemoryFreeClear(pReader->suppInfo.plist);
2783 2784

  taosArrayDestroy(pReader->suppInfo.pColAgg);
2785
  taosMemoryFree(pReader->suppInfo.slotIds);
H
refact  
Hongze Cheng 已提交
2786

H
Haojun Liao 已提交
2787 2788 2789 2790
#if 0
//   if (pReader->status.pTableScanInfo != NULL) {
//     pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
//   }
H
refact  
Hongze Cheng 已提交
2791

H
Haojun Liao 已提交
2792
//   tsdbDestroyReadH(&pReader->rhelper);
H
refact  
Hongze Cheng 已提交
2793

H
Haojun Liao 已提交
2794 2795 2796 2797 2798 2799
//   tdFreeDataCols(pReader->pDataCols);
//   pReader->pDataCols = NULL;
//
//   pReader->prev = doFreeColumnInfoData(pReader->prev);
//   pReader->next = doFreeColumnInfoData(pReader->next);
#endif
H
refact  
Hongze Cheng 已提交
2800

2801
  SIOCostSummary* pCost = &pReader->cost;
H
refact  
Hongze Cheng 已提交
2802

2803 2804
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
2805
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaLoadTime, pCost->blockLoadTime,
2806
            pCost->checkForNextTime, pReader->idStr);
H
refact  
Hongze Cheng 已提交
2807

2808 2809 2810
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
2811 2812 2813
}

bool tsdbNextDataBlock(STsdbReader* pReader) {
2814
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
2815 2816
    return false;
  }
H
Hongze Cheng 已提交
2817

H
Haojun Liao 已提交
2818
  // cleanup the data that belongs to the previous data block
2819 2820
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Hongze Cheng 已提交
2821

2822 2823
  int64_t        stime = taosGetTimestampUs();
  int64_t        elapsedTime = stime;
2824
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
2825 2826

  if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
2827
    if (pStatus->loadFromFile) {
2828
      int32_t code = buildBlockFromFiles(pReader);
2829 2830 2831 2832
      if (code != TSDB_CODE_SUCCESS) {
        return false;
      }

2833
      if (pBlock->info.rows > 0) {
2834
        return true;
2835
      } else {
H
Haojun Liao 已提交
2836
        buildBlockFromBufferSequentially(pReader);
2837
        return pBlock->info.rows > 0;
2838
      }
2839
    } else {  // no data in files, let's try the buffer
H
Haojun Liao 已提交
2840
      buildBlockFromBufferSequentially(pReader);
2841
      return pBlock->info.rows > 0;
H
Haojun Liao 已提交
2842 2843 2844
    }
  } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
  } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) {
2845 2846
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2847
  }
2848
  return false;
H
refact  
Hongze Cheng 已提交
2849 2850 2851
}

void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
2852 2853 2854 2855
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
2856 2857
}

2858
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
2859
  int32_t code = 0;
2860
  *allHave = false;
H
Hongze Cheng 已提交
2861

2862
  // there is no statistics data for composed block
2863 2864 2865 2866
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2867

2868 2869 2870
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
  SBlock*              pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
H
Hongze Cheng 已提交
2871

2872
  int64_t stime = taosGetTimestampUs();
H
Hongze Cheng 已提交
2873

2874 2875
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;

2876
  if (tBlockHasSma(pBlock)) {
2877
    code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
2878
    if (code != TSDB_CODE_SUCCESS) {
2879 2880
      tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
                pReader->idStr);
2881 2882
      return code;
    }
2883 2884 2885
  } else {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
2886
  }
H
Hongze Cheng 已提交
2887

2888
  *allHave = true;
H
Hongze Cheng 已提交
2889

2890 2891
  // always load the first primary timestamp column data
  SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
2892

2893 2894
  pTsAgg->numOfNull = 0;
  pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910
  pTsAgg->min = pReader->pResBlock->info.window.skey;
  pTsAgg->max = pReader->pResBlock->info.window.ekey;
  pSup->plist[0] = pTsAgg;

  // update the number of NULL data rows
  size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);

  int32_t i = 0, j = 0;
  while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
    SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
    if (pAgg->colId == pSup->colIds[j]) {
      if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
        pSup->plist[j] = pAgg;
      } else {
        *allHave = false;
      }
2911 2912
      i += 1;
      j += 1;
2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925
    } else if (pAgg->colId < pSup->colIds[j]) {
      i += 1;
    } else if (pSup->colIds[j] < pAgg->colId) {
      j += 1;
    }
  }

  int64_t elapsed = taosGetTimestampUs() - stime;
  pReader->cost.smaLoadTime += elapsed;

  *pBlockStatis = pSup->plist;

  tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
2926 2927
            elapsed, pReader->idStr);

H
Hongze Cheng 已提交
2928
  return code;
H
Hongze Cheng 已提交
2929 2930
}

H
Hongze Cheng 已提交
2931
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
H
Haojun Liao 已提交
2932 2933 2934
  SReaderStatus* pStatus = &pReader->status;

  if (pStatus->composedDataBlock) {
2935
    return pReader->pResBlock->pDataBlock;
2936
  }
2937

2938
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
2939
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2940

2941 2942 2943 2944 2945
  int32_t code = tBlockDataInit(&pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
2946

2947 2948 2949 2950
  code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
2951
  }
2952 2953 2954

  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
H
Hongze Cheng 已提交
2955 2956
}

H
Haojun Liao 已提交
2957
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
2958 2959 2960
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
2961

2962 2963
  pReader->order               = pCond->order;
  pReader->type                = BLOCK_LOAD_OFFSET_ORDER;
2964
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
2965
  pReader->status.pTableIter = NULL;
H
Hongze Cheng 已提交
2966

H
Haojun Liao 已提交
2967
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
H
Hongze Cheng 已提交
2968

2969
  // allocate buffer in order to load data blocks from file
2970
  memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
2971 2972
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);

2973 2974
  pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;

2975 2976 2977 2978 2979 2980 2981
  // todo set the correct numOfTables
  int32_t         numOfTables = 1;
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;

  STsdbFSState* pFState = pReader->pTsdb->fs->cState;
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
2982
  resetDataBlockScanInfo(pReader->status.pTableMap);
2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993

  int32_t code = 0;
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
H
Hongze Cheng 已提交
2994

dengyihao's avatar
dengyihao 已提交
2995 2996
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
2997
  return code;
H
Hongze Cheng 已提交
2998
}
H
Hongze Cheng 已提交
2999

3000 3001 3002
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}
H
Hongze Cheng 已提交
3003

3004 3005 3006 3007
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
  int32_t code = TSDB_CODE_SUCCESS;
  pTableBlockInfo->totalSize = 0;
  pTableBlockInfo->totalRows = 0;
H
Hongze Cheng 已提交
3008

3009 3010
  // find the start data block in file
  SReaderStatus* pStatus = &pReader->status;
H
Hongze Cheng 已提交
3011

3012 3013 3014
  STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;
H
Hongze Cheng 已提交
3015

3016
  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Hongze Cheng 已提交
3017

3018
  pTableBlockInfo->numOfFiles += 1;
H
Hongze Cheng 已提交
3019

3020 3021
  int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
  int     defaultRows = 4096;
H
Hongze Cheng 已提交
3022

3023 3024 3025
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
  pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
  pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
H
Hongze Cheng 已提交
3026

3027
  pTableBlockInfo->numOfTables = numOfTables;
3028
  bool hasNext = true;
H
Hongze Cheng 已提交
3029

3030 3031 3032 3033 3034
  while (true) {
    if (hasNext) {
      SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
      STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
      SBlock*              pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
H
Hongze Cheng 已提交
3035

3036 3037
      int32_t numOfRows = pBlock->nRow;
      pTableBlockInfo->totalRows += numOfRows;
H
Hongze Cheng 已提交
3038

3039 3040 3041
      if (numOfRows > pTableBlockInfo->maxRows) {
        pTableBlockInfo->maxRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3042

3043 3044 3045
      if (numOfRows < pTableBlockInfo->minRows) {
        pTableBlockInfo->minRows = numOfRows;
      }
H
refact  
Hongze Cheng 已提交
3046

3047 3048 3049
      if (numOfRows < defaultRows) {
        pTableBlockInfo->numOfSmallBlocks += 1;
      }
H
refact  
Hongze Cheng 已提交
3050

3051 3052
      int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
      pTableBlockInfo->blockRowsHisto[bucketIndex]++;
3053 3054 3055

      hasNext = blockIteratorNext(&pStatus->blockIter);

3056 3057 3058 3059 3060
    } else {
      code = initForFirstBlockInFile(pReader, pBlockIter);
      if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
        break;
      }
H
refact  
Hongze Cheng 已提交
3061

3062 3063
      pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
    }
H
refact  
Hongze Cheng 已提交
3064

3065 3066 3067 3068 3069
/*
    hasNext = blockIteratorNext(&pStatus->blockIter);
*/


3070 3071 3072
//         tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
//                   pReader->pFileGroup->fid, pReader->idStr);
  }
H
Hongze Cheng 已提交
3073

H
refact  
Hongze Cheng 已提交
3074 3075
  return code;
}
H
Hongze Cheng 已提交
3076

H
refact  
Hongze Cheng 已提交
3077
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3078
  int64_t rows = 0;
H
Hongze Cheng 已提交
3079

3080 3081
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
H
Hongze Cheng 已提交
3082

3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;

    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Hongze Cheng 已提交
3105

H
refact  
Hongze Cheng 已提交
3106
  return rows;
H
Hongze Cheng 已提交
3107
}
D
dapan1121 已提交
3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143

int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) {
  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  *suid = 0;
  
  if (mr.me.type == TSDB_CHILD_TABLE) {
    *suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, *suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
  
  return TSDB_CODE_SUCCESS;
}