tsdbRead.c 118.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
H
Hongze Cheng 已提交
18

19
typedef struct {
dengyihao's avatar
dengyihao 已提交
20
  STbDataIter* iter;
21 22 23 24
  int32_t      index;
  bool         hasVal;
} SIterInfo;

H
Haojun Liao 已提交
25
typedef struct STableBlockScanInfo {
dengyihao's avatar
dengyihao 已提交
26 27 28 29 30 31 32 33 34
  uint64_t  uid;
  TSKEY     lastKey;
  SBlockIdx blockIdx;
  SArray*   pBlockList;  // block data index list
  SIterInfo iter;        // mem buffer skip list iterator
  SIterInfo iiter;       // imem buffer skip list iterator
  SArray*   delSkyline;  // delete info for this table
  int32_t   fileDelIndex;
  bool      iterInit;  // whether to initialize the in-memory skip list iterator or not
H
Haojun Liao 已提交
35 36 37
} STableBlockScanInfo;

typedef struct SBlockOrderWrapper {
dengyihao's avatar
dengyihao 已提交
38 39
  int64_t uid;
  SBlock* pBlock;
H
Haojun Liao 已提交
40
} SBlockOrderWrapper;
41

42
typedef struct SBlockOrderSupporter {
43 44 45 46
  SBlockOrderWrapper** pDataBlockInfo;
  int32_t*             indexPerTable;
  int32_t*             numOfBlocksPerTable;
  int32_t              numOfTables;
47 48
} SBlockOrderSupporter;

H
Haojun Liao 已提交
49 50 51
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
52
  int64_t checkForNextTime;
53 54
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
55 56
} SIOCostSummary;

57
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
58 59
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
60 61 62
  int16_t*         colIds;    // column ids for loading file block data
  int32_t*         slotIds;   // colId to slotId
  char**           buildBuf;  // build string tmp buffer, todo remove it later after all string format being updated.
63 64
} SBlockLoadSuppInfo;

65
typedef struct SFilesetIter {
66 67 68 69
  int32_t numOfFiles;  // number of total files
  int32_t index;       // current accessed index in the list
  SArray* pFileList;   // data file list
  int32_t order;
70
} SFilesetIter;
H
Haojun Liao 已提交
71 72

typedef struct SFileDataBlockInfo {
dengyihao's avatar
dengyihao 已提交
73 74 75
  int32_t
           tbBlockIdx;  // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
  uint64_t uid;
H
Haojun Liao 已提交
76 77 78
} SFileDataBlockInfo;

typedef struct SDataBlockIter {
dengyihao's avatar
dengyihao 已提交
79 80 81 82
  int32_t numOfBlocks;
  int32_t index;
  SArray* blockList;  // SArray<SFileDataBlockInfo>
  int32_t order;
H
Haojun Liao 已提交
83 84 85
} SDataBlockIter;

typedef struct SFileBlockDumpInfo {
dengyihao's avatar
dengyihao 已提交
86 87 88 89
  int32_t totalRows;
  int32_t rowIndex;
  int64_t lastKey;
  bool    allDumped;
H
Haojun Liao 已提交
90 91
} SFileBlockDumpInfo;

H
Haojun Liao 已提交
92
typedef struct SVersionRange {
dengyihao's avatar
dengyihao 已提交
93 94
  uint64_t minVer;
  uint64_t maxVer;
H
Haojun Liao 已提交
95 96
} SVersionRange;

H
Haojun Liao 已提交
97
typedef struct SReaderStatus {
dengyihao's avatar
dengyihao 已提交
98 99
  bool                 loadFromFile;  // check file stage
  SHashObj*            pTableMap;     // SHash<STableBlockScanInfo>
100
  STableBlockScanInfo* pTableIter;    // table iterator used in building in-memory buffer data blocks.
101
  SFileBlockDumpInfo   fBlockDumpInfo;
102

dengyihao's avatar
dengyihao 已提交
103 104 105 106 107
  SDFileSet*     pCurrentFileset;  // current opened file set
  SBlockData     fileBlockData;
  SFilesetIter   fileIter;
  SDataBlockIter blockIter;
  bool           composedDataBlock;  // the returned data block is a composed block or not
H
Haojun Liao 已提交
108 109
} SReaderStatus;

H
Hongze Cheng 已提交
110
struct STsdbReader {
H
Haojun Liao 已提交
111 112 113 114 115 116 117
  STsdb*             pTsdb;
  uint64_t           suid;
  int16_t            order;
  STimeWindow        window;  // the primary query time window that applies to all queries
  SSDataBlock*       pResBlock;
  int32_t            capacity;
  SReaderStatus      status;
118 119
  char*              idStr;  // query info handle, for debug purpose
  int32_t            type;   // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
H
Hongze Cheng 已提交
120
  SBlockLoadSuppInfo suppInfo;
121

dengyihao's avatar
dengyihao 已提交
122 123 124 125
  SIOCostSummary cost;
  STSchema*      pSchema;
  SDataFReader*  pFileReader;
  SVersionRange  verRange;
H
Haojun Liao 已提交
126
#if 0
127 128
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
H
Haojun Liao 已提交
129
  SFileBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
130 131 132 133
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
H
Haojun Liao 已提交
134 135 136 137 138 139
  //  SDFileSet* pFileGroup;
  // SFSIter            fileIter;
  // SReadH             rhelper;
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
#endif
H
Hongze Cheng 已提交
140
};
H
Haojun Liao 已提交
141

H
Haojun Liao 已提交
142
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
143 144
static int      buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                          STsdbReader* pReader);
145
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
146 147
static int32_t  doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                        SRowMerger* pMerger);
dengyihao's avatar
dengyihao 已提交
148 149
static int32_t  doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
                                 STsdbReader* pReader);
150 151 152
static int32_t  doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void     setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void     updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
153
static bool     hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey);
154

dengyihao's avatar
dengyihao 已提交
155 156
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                             STsdbReader* pReader);
157 158
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                               STSRow** pTSRow);
dengyihao's avatar
dengyihao 已提交
159 160 161 162
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                                      STbData* piMemTbData);
static STsdb*  getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
                                   int8_t* pLevel);
163
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
H
Haojun Liao 已提交
164

165 166 167
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;

168
  size_t numOfCols = blockDataGetNumOfCols(pBlock);
169

170
  pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
171
  pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
172 173 174
  if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
    taosMemoryFree(pSupInfo->colIds);
    taosMemoryFree(pSupInfo->buildBuf);
H
Haojun Liao 已提交
175
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
176
  }
H
Haojun Liao 已提交
177

H
Haojun Liao 已提交
178 179
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
180
    pSupInfo->colIds[i] = pCol->info.colId;
181

182 183 184
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
    }
L
Liu Jicong 已提交
185 186
  }

H
Haojun Liao 已提交
187
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
188 189
}

190
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
H
Haojun Liao 已提交
191
  // allocate buffer in order to load data blocks from file
192 193 194 195
  // todo use simple hash instead
  SHashObj* pTableMap =
      taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (pTableMap == NULL) {
H
Haojun Liao 已提交
196 197 198 199
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
200 201 202 203 204
  for (int32_t j = 0; j < numOfTables; ++j) {
    STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
    if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
        info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
205 206
      }

207
      ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
wmmhello's avatar
wmmhello 已提交
208
    } else {
209
      info.lastKey = pTsdbReader->window.skey;
H
Haojun Liao 已提交
210
    }
wmmhello's avatar
wmmhello 已提交
211

212 213 214
    taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
              pTsdbReader->idStr);
H
Haojun Liao 已提交
215 216
  }

217
  return pTableMap;
H
Haojun Liao 已提交
218 219
}

220 221
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
  STableBlockScanInfo* p = NULL;
H
Haojun Liao 已提交
222

dengyihao's avatar
dengyihao 已提交
223
  while ((p = taosHashIterate(pTableMap, p)) != NULL) {
224 225
    p->iterInit = false;
    p->iiter.hasVal = false;
dengyihao's avatar
dengyihao 已提交
226
    if (p->iter.iter != NULL) {
227
      tsdbTbDataIterDestroy(p->iter.iter);
228
    }
H
Haojun Liao 已提交
229

230 231
    taosArrayDestroy(p->delSkyline);
  }
H
Haojun Liao 已提交
232 233
}

234
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
235 236
  ASSERT(pWindow != NULL);
  return pWindow->skey > pWindow->ekey;
237 238
}

239 240
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
241
static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
dengyihao's avatar
dengyihao 已提交
242
  STsdbKeepCfg* pCfg = &pTsdb->keepCfg;
243 244

  int64_t now = taosGetTimestamp(pCfg->precision);
dengyihao's avatar
dengyihao 已提交
245
  int64_t earilyTs = now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
246

dengyihao's avatar
dengyihao 已提交
247
  STimeWindow win = *pWindow;
248 249
  if (win.skey < earilyTs) {
    win.skey = earilyTs;
250 251
  }

252
  return win;
253
}
C
Cary Xu 已提交
254

H
Haojun Liao 已提交
255
// todo remove this
H
Hongze Cheng 已提交
256 257
static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
  // pReader->window = pCond->twindows[tWinIdx];
C
Cary Xu 已提交
258

H
Hongze Cheng 已提交
259
  // bool    updateTs = false;
260
  // int64_t startTs = updateQueryTimeWindow(pReader->pTsdb);
H
Hongze Cheng 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
  // if (ASCENDING_TRAVERSE(pReader->order)) {
  //   if (startTs > pReader->window.skey) {
  //     pReader->window.skey = startTs;
  //     pCond->twindows[tWinIdx].skey = startTs;
  //     updateTs = true;
  //   }
  // } else {
  //   if (startTs > pReader->window.ekey) {
  //     pReader->window.ekey = startTs;
  //     pCond->twindows[tWinIdx].ekey = startTs;
  //     updateTs = true;
  //   }
  // }
C
Cary Xu 已提交
274

H
Hongze Cheng 已提交
275 276 277 278 279
  // if (updateTs) {
  //   tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
  //             pReader, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pReader->window.skey,
  //             pReader->window.ekey, pReader->idStr);
  // }
C
Cary Xu 已提交
280 281
}

H
Haojun Liao 已提交
282
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
H
Haojun Liao 已提交
283 284 285 286 287 288
  int32_t rowLen = 0;
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    rowLen += pCond->colList[i].bytes;
  }

  // make sure the output SSDataBlock size be less than 2MB.
H
Haojun Liao 已提交
289 290 291
  const int32_t TWOMB = 2 * 1024 * 1024;
  if ((*capacity) * rowLen > TWOMB) {
    (*capacity) = TWOMB / rowLen;
292
  }
H
Haojun Liao 已提交
293
}
H
Haojun Liao 已提交
294

H
Haojun Liao 已提交
295
// init file iterator
296
static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFState, int32_t order, const char* idstr) {
297
  size_t numOfFileset = taosArrayGetSize(pFState->aDFileSet);
C
Cary Xu 已提交
298

299 300 301
  pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
  pIter->order = order;
  pIter->pFileList = taosArrayDup(pFState->aDFileSet);
302
  pIter->numOfFiles = numOfFileset;
303

H
Haojun Liao 已提交
304
  tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
H
Haojun Liao 已提交
305 306
  return TSDB_CODE_SUCCESS;
}
307

308
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
309 310
  bool    asc = ASCENDING_TRAVERSE(pIter->order);
  int32_t step = asc ? 1 : -1;
311
  pIter->index += step;
H
Haojun Liao 已提交
312

313
  if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
H
Haojun Liao 已提交
314
    return false;
B
Bomin Zhang 已提交
315
  }
H
Haojun Liao 已提交
316

H
Haojun Liao 已提交
317 318
  // check file the time range of coverage
  STimeWindow win = {0};
319

320
  while (1) {
321
    pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
H
Haojun Liao 已提交
322

323 324 325
    int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
326 327
    }

328 329
    int32_t fid = pReader->status.pCurrentFileset->fid;
    tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
H
Haojun Liao 已提交
330

331 332 333 334 335
    // current file are no longer overlapped with query time window, ignore remain files
    if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
                pReader->window.skey, pReader->window.ekey, pReader->idStr);
      return false;
336
    }
H
Haojun Liao 已提交
337

338 339 340
    if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
      pIter->index += step;
      continue;
B
Bomin Zhang 已提交
341
    }
H
Haojun Liao 已提交
342

343 344
    tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, pReader->window.skey,
              pReader->window.ekey, pReader->idStr);
345
    return true;
346 347
  }

348
_err:
H
Haojun Liao 已提交
349
  return false;
350 351
}

352
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
353 354
  pIter->order = order;
  pIter->index = -1;
H
Haojun Liao 已提交
355
  pIter->numOfBlocks = -1;
356
  pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
L
Liu Jicong 已提交
357 358
}

H
Haojun Liao 已提交
359
static void initReaderStatus(SReaderStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
360 361
  pStatus->pTableIter = NULL;
  pStatus->loadFromFile = true;
wmmhello's avatar
wmmhello 已提交
362 363
}

364 365 366 367
static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) {
  SSDataBlock* pResBlock = createDataBlock();
  if (pResBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
368 369 370
    return NULL;
  }

371 372 373 374
  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
    SColumnInfoData colInfo = {{0}, 0};
    colInfo.info = pCond->colList[i];
    blockDataAppendColInfo(pResBlock, &colInfo);
H
Haojun Liao 已提交
375 376
  }

377
  int32_t code = blockDataEnsureCapacity(pResBlock, capacity);
378 379
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
380
    taosMemoryFree(pResBlock);
381 382
    return NULL;
  }
383

384 385
  return pResBlock;
}
386

H
Haojun Liao 已提交
387 388
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) {
  int32_t      code = 0;
389
  int8_t       level = 0;
H
Haojun Liao 已提交
390
  STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
Hongze Cheng 已提交
391 392
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
393
    goto _end;
394
  }
395

H
Haojun Liao 已提交
396
  initReaderStatus(&pReader->status);
397

dengyihao's avatar
dengyihao 已提交
398 399 400 401 402 403 404
  pReader->pTsdb =
      getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
  pReader->suid = pCond->suid;
  pReader->order = pCond->order;
  pReader->capacity = 4096;
  pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
  pReader->verRange = getQueryVerRange(pVnode, pCond, level);
405
  pReader->type = pCond->type;
dengyihao's avatar
dengyihao 已提交
406
  pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows);
H
Haojun Liao 已提交
407

H
Haojun Liao 已提交
408
  // todo remove this
H
Hongze Cheng 已提交
409
  setQueryTimewindow(pReader, pCond, 0);
410
  ASSERT(pCond->numOfCols > 0);
H
Haojun Liao 已提交
411

412
  limitOutputBufferSize(pCond, &pReader->capacity);
413

414 415 416 417 418 419 420
  // allocate buffer in order to load data blocks from file
  SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
  pSup->pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
  pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
  if (pSup->pstatis == NULL || pSup->plist == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
421 422
  }

423 424 425 426
  pReader->pResBlock = createResBlock(pCond, pReader->capacity);
  if (pReader->pResBlock == NULL) {
    code = terrno;
    goto _end;
H
Haojun Liao 已提交
427 428
  }

429
  setColumnIdSlotList(pReader, pReader->pResBlock);
H
Haojun Liao 已提交
430

H
Hongze Cheng 已提交
431 432
  *ppReader = pReader;
  return code;
H
Haojun Liao 已提交
433

H
Haojun Liao 已提交
434 435
_end:
  tsdbReaderClose(pReader);
H
Hongze Cheng 已提交
436 437
  *ppReader = NULL;
  return code;
H
Haojun Liao 已提交
438 439
}

H
Hongze Cheng 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
//                                      int32_t tWinIdx) {
//   STsdbReader* pTsdbReadHandle = queryHandle;

//   pTsdbReadHandle->order = pCond->order;
//   pTsdbReadHandle->window = pCond->twindows[tWinIdx];
//   pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
//   pTsdbReadHandle->cur.fid = -1;
//   pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
//   pTsdbReadHandle->checkFiles = true;
//   pTsdbReadHandle->activeIndex = 0;  // current active table index
//   pTsdbReadHandle->locateStart = false;
//   pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;

//   if (ASCENDING_TRAVERSE(pCond->order)) {
//     assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
//   } else {
//     assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
//   }

//   // allocate buffer in order to load data blocks from file
//   memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
//   memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);

//   tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
//   tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);

//   SArray* pTable = NULL;
//   //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);

//   //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);

H
Haojun Liao 已提交
472
//   pTsdbReadHandle->pTableCheckInfo = NULL;  // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
H
Hongze Cheng 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
//                                             // &pTable);
//   if (pTsdbReadHandle->pTableCheckInfo == NULL) {
//     //    tsdbReaderClose(pTsdbReadHandle);
//     terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
//   }

//   //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//   //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }

// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
//   assert(pHandle != NULL);

//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;

//   size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   SArray* res = taosArrayInit(size, POINTER_BYTES);
//   return res;
// }

493 494
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT
// maxVer) {
H
Hongze Cheng 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
//   TSDBROW row = {0};
//   STSRow *rmem = NULL, *rimem = NULL;

//   if (pCheckInfo->iter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
//       rmem = row.pTSRow;
//     }
//   }

//   if (pCheckInfo->iiter) {
//     if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
//       rimem = row.pTSRow;
//     }
//   }

//   if (rmem == NULL && rimem == NULL) {
//     return TSKEY_INITIAL_VAL;
//   }

//   if (rmem != NULL && rimem == NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return TD_ROW_KEY(rmem);
//   }

//   if (rmem == NULL && rimem != NULL) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return TD_ROW_KEY(rimem);
//   }

//   TSKEY r1 = TD_ROW_KEY(rmem);
//   TSKEY r2 = TD_ROW_KEY(rimem);

//   if (r1 == r2) {
//     if (TD_SUPPORT_UPDATE(update)) {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
//     } else {
//       pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//       tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     return r1;
//   } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
//     return r1;
//   } else {
//     pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
//     return r2;
//   }
// }

H
Haojun Liao 已提交
544
// static bool moveToNextRowInMem(STableBlockScanInfo* pCheckInfo) {
H
Hongze Cheng 已提交
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
//   bool hasNext = false;
//   if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iiter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
//     }
//   } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
//     }

//     if (hasNext) {
//       return hasNext;
//     }

//     if (pCheckInfo->iter != NULL) {
//       return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
//     }
//   } else {
//     if (pCheckInfo->iter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
//     }
//     if (pCheckInfo->iiter != NULL) {
//       hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
//     }
//   }
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579 580
//   return hasNext;
// }
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582 583 584
// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
//   int32_t firstSlot = 0;
//   int32_t lastSlot = numOfBlocks - 1;
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586
//   int32_t midSlot = firstSlot;
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588 589 590
//   while (1) {
//     numOfBlocks = lastSlot - firstSlot + 1;
//     midSlot = (firstSlot + (numOfBlocks >> 1));
H
Hongze Cheng 已提交
591

H
Hongze Cheng 已提交
592
//     if (numOfBlocks == 1) break;
H
Hongze Cheng 已提交
593

H
Hongze Cheng 已提交
594 595 596 597 598 599 600 601 602 603 604
//     if (skey > pBlock[midSlot].maxKey.ts) {
//       if (numOfBlocks == 2) break;
//       if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
//       firstSlot = midSlot + 1;
//     } else if (skey < pBlock[midSlot].minKey.ts) {
//       if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
//       lastSlot = midSlot - 1;
//     } else {
//       break;  // got the slot
//     }
//   }
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606 607
//   return midSlot;
// }
H
Hongze Cheng 已提交
608

H
Haojun Liao 已提交
609
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
610
  SArray* aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
611

612
  int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
H
Haojun Liao 已提交
613
  if (code != TSDB_CODE_SUCCESS) {
614
    goto _end;
H
Haojun Liao 已提交
615 616
  }

H
Hongze Cheng 已提交
617 618
  if (taosArrayGetSize(aBlockIdx) == 0) {
    taosArrayClear(aBlockIdx);
H
Haojun Liao 已提交
619
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
620
  }
H
Haojun Liao 已提交
621

622
  SBlockIdx* pBlockIdx;
H
Hongze Cheng 已提交
623
  for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
624
    pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
H
Haojun Liao 已提交
625

626
    // uid check
H
Hongze Cheng 已提交
627
    if (pBlockIdx->suid != pReader->suid) {
H
Haojun Liao 已提交
628 629
      continue;
    }
H
Haojun Liao 已提交
630

H
Haojun Liao 已提交
631
    // this block belongs to a table that is not queried.
H
Hongze Cheng 已提交
632
    void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
H
Haojun Liao 已提交
633 634 635
    if (p == NULL) {
      continue;
    }
H
Haojun Liao 已提交
636

637 638
    // todo: not valid info in bockIndex
    // time range check
639 640 641
    //    if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) {
    //      continue;
    //    }
642

643
    // version check
644 645 646
    //    if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) {
    //      continue;
    //    }
H
Haojun Liao 已提交
647 648 649 650 651

    STableBlockScanInfo* pScanInfo = p;
    if (pScanInfo->pBlockList == NULL) {
      pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
    }
H
Haojun Liao 已提交
652

H
Hongze Cheng 已提交
653 654
    pScanInfo->blockIdx = *pBlockIdx;
    taosArrayPush(pIndexList, pBlockIdx);
D
init  
dapan1121 已提交
655
  }
dengyihao's avatar
dengyihao 已提交
656

657
_end:
H
Hongze Cheng 已提交
658
  taosArrayDestroy(aBlockIdx);
H
Haojun Liao 已提交
659
  return code;
D
init  
dapan1121 已提交
660 661
}

662 663
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables,
                               int32_t* numOfBlocks) {
H
Haojun Liao 已提交
664
  size_t numOfTables = taosArrayGetSize(pIndexList);
665

H
Haojun Liao 已提交
666
  *numOfValidTables = 0;
D
init  
dapan1121 已提交
667

668
  STableBlockScanInfo* px = NULL;
dengyihao's avatar
dengyihao 已提交
669
  while (1) {
670 671 672 673
    px = taosHashIterate(pReader->status.pTableMap, px);
    if (px == NULL) {
      break;
    }
H
hjxilinx 已提交
674

675 676
    taosArrayClear(px->pBlockList);
  }
H
Haojun Liao 已提交
677

dengyihao's avatar
dengyihao 已提交
678
  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
679
    SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
H
Haojun Liao 已提交
680

H
Hongze Cheng 已提交
681
    SMapData mapData = {0};
H
Haojun Liao 已提交
682 683
    tMapDataReset(&mapData);
    tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
684

H
Haojun Liao 已提交
685 686 687
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
    for (int32_t j = 0; j < mapData.nItem; ++j) {
      SBlock block = {0};
688

H
Hongze Cheng 已提交
689
      tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock);
690

691
      // 1. time range check
692
      if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
H
Haojun Liao 已提交
693 694
        continue;
      }
H
Haojun Liao 已提交
695

696
      // 2. version range check
697 698 699
      if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
        continue;
      }
H
Haojun Liao 已提交
700

H
Haojun Liao 已提交
701 702 703 704
      void* p = taosArrayPush(pScanInfo->pBlockList, &block);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
705

706
      (*numOfBlocks) += 1;
H
Haojun Liao 已提交
707
    }
H
Haojun Liao 已提交
708

H
Haojun Liao 已提交
709 710
    if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
      (*numOfValidTables) += 1;
H
Haojun Liao 已提交
711
    }
712
  }
H
Haojun Liao 已提交
713

H
Haojun Liao 已提交
714 715
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
716

717 718
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
719
  int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
H
Haojun Liao 已提交
720

721
  pDumpInfo->allDumped = true;
722
  pDumpInfo->lastKey = pBlock->maxKey.ts + step;
H
Haojun Liao 已提交
723
}
H
Haojun Liao 已提交
724

725 726
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
                         SBlockLoadSuppInfo* pSup) {
H
Haojun Liao 已提交
727
  if (IS_VAR_DATA_TYPE(pColVal->type)) {
728
    if (pColVal->isNull || pColVal->isNone) {
H
Haojun Liao 已提交
729
      colDataAppendNULL(pColInfoData, rowIndex);
H
Haojun Liao 已提交
730
    } else {
H
Haojun Liao 已提交
731 732 733
      varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
      memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
      colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
H
Haojun Liao 已提交
734
    }
735
  } else {
736
    colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull || pColVal->isNone);
737
  }
H
Haojun Liao 已提交
738
}
H
Haojun Liao 已提交
739

740
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
741
  SReaderStatus*  pStatus = &pReader->status;
742
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Haojun Liao 已提交
743

744
  SBlockData*         pBlockData = &pStatus->fileBlockData;
H
Haojun Liao 已提交
745
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
H
Haojun Liao 已提交
746 747
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
748
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);
H
Haojun Liao 已提交
749

H
Haojun Liao 已提交
750
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
751
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
752

753
  int64_t st = taosGetTimestampUs();
754

H
Haojun Liao 已提交
755
  SColVal cv = {0};
756
  int32_t colIndex = 0;
H
Haojun Liao 已提交
757

758 759
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
  int32_t step = asc ? 1 : -1;
H
Hongze Cheng 已提交
760

761
  int32_t rowIndex = 0;
762 763
  int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);

764 765 766 767 768 769
  int32_t endIndex = 0;
  if (remain <= pReader->capacity) {
    endIndex = pBlockData->nRow;
  } else {
    endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
    remain = pReader->capacity;
770 771
  }

772
  int32_t          i = 0;
773 774
  SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
  if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
775
    for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
776
      colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
777
    }
778
    i += 1;
779 780
  }

H
Hongze Cheng 已提交
781
  while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
782 783 784
    rowIndex = 0;
    pColData = taosArrayGet(pResBlock->pDataBlock, i);

H
Hongze Cheng 已提交
785
    SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
786 787

    if (pData->cid == pColData->info.colId) {
788
      for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
789 790
        tColDataGetValue(pData, j, &cv);
        doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
H
Haojun Liao 已提交
791
      }
792 793 794
      colIndex += 1;
    } else {  // the specified column does not exist in file block, fill with null data
      colDataAppendNNULL(pColData, 0, remain);
H
Haojun Liao 已提交
795
    }
796

797 798
    ASSERT(rowIndex == remain);
    i += 1;
799 800
  }

801
  while (i < numOfCols) {
802 803 804
    pColData = taosArrayGet(pResBlock->pDataBlock, i);
    colDataAppendNNULL(pColData, 0, remain);
    i += 1;
805 806
  }

807
  pResBlock->info.rows = remain;
808
  pDumpInfo->rowIndex += step * remain;
809

810
  setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
811

H
Haojun Liao 已提交
812 813
  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;
814

815
  int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
H
Haojun Liao 已提交
816
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
817
            ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
818 819
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
820

821 822
  return TSDB_CODE_SUCCESS;
}
823

824
// todo consider the output buffer size
825 826
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter,
                                   STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
827
  int64_t st = taosGetTimestampUs();
828

829 830 831 832
  SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
  SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
  SSDataBlock*        pResBlock = pReader->pResBlock;
  int32_t             numOfCols = blockDataGetNumOfCols(pResBlock);
833

834 835
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
836

837
  uint8_t *pb = NULL, *pb1 = NULL;
838
  int32_t  code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
dengyihao's avatar
dengyihao 已提交
839
                                 pBlockData, &pb, &pb1);
840 841
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
842
  }
843

844 845
  int64_t elapsedTime = (taosGetTimestampUs() - st);
  pReader->cost.blockLoadTime += elapsedTime;
846

847 848
  pDumpInfo->allDumped = false;
  tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
849
            ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
850
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
H
Haojun Liao 已提交
851 852
            pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
  return TSDB_CODE_SUCCESS;
853

H
Haojun Liao 已提交
854
_error:
H
Haojun Liao 已提交
855 856 857 858 859
  tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
            ", rows:%d, %s",
            pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
            pReader->idStr);
  return code;
H
Haojun Liao 已提交
860
}
861

H
Hongze Cheng 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
//   int    firstPos, lastPos, midPos = -1;
//   int    numOfRows;
//   TSKEY* keyList;

//   assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

//   if (num <= 0) return -1;

//   keyList = (TSKEY*)pValue;
//   firstPos = 0;
//   lastPos = num - 1;

//   if (order == TSDB_ORDER_DESC) {
//     // find the first position which is smaller than the key
//     while (1) {
//       if (key >= keyList[lastPos]) return lastPos;
//       if (key == keyList[firstPos]) return firstPos;
//       if (key < keyList[firstPos]) return firstPos - 1;

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }

//   } else {
//     // find the first position which is bigger than the key
//     while (1) {
//       if (key <= keyList[firstPos]) return firstPos;
//       if (key == keyList[lastPos]) return lastPos;

//       if (key > keyList[lastPos]) {
//         lastPos = lastPos + 1;
//         if (lastPos >= num)
//           return -1;
//         else
//           return lastPos;
//       }

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }
//   }
H
Hongze Cheng 已提交
920

H
Hongze Cheng 已提交
921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
//   return midPos;
// }
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
//                                   STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
//                                   pSchema2, bool update, TSKEY* lastRowKey) {
// #if 1
//   STSchema* pSchema;
//   STSRow*   row;
//   int16_t   colId;
//   int16_t   offset;

//   bool     isRow1DataRow = TD_IS_TP_ROW(row1);
//   bool     isRow2DataRow;
//   bool     isChosenRowDataRow;
//   int32_t  chosen_itr;
//   SCellVal sVal = {0};
//   TSKEY    rowKey = TSKEY_INITIAL_VAL;
//   int32_t  nResult = 0;
//   int32_t  mergeOption = 0;  // 0 discard 1 overwrite 2 merge

//   // the schema version info is embeded in STSRow
//   int32_t numOfColsOfRow1 = 0;

//   if (pSchema1 == NULL) {
//     pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
//   }
H
Hongze Cheng 已提交
947

H
Hongze Cheng 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960
// #ifdef TD_DEBUG_PRINT_ROW
//   char   flags[70] = {0};
//   STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
//   snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
//            pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
//   tdSRowPrint(row1, pSchema1, flags);
// #endif

//   if (isRow1DataRow) {
//     numOfColsOfRow1 = schemaNCols(pSchema1);
//   } else {
//     numOfColsOfRow1 = tdRowGetNCols(row1);
//   }
H
Hongze Cheng 已提交
961

H
Hongze Cheng 已提交
962 963 964 965 966 967 968 969 970 971 972 973
//   int32_t numOfColsOfRow2 = 0;
//   if (row2) {
//     isRow2DataRow = TD_IS_TP_ROW(row2);
//     if (pSchema2 == NULL) {
//       pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
//     }
//     if (isRow2DataRow) {
//       numOfColsOfRow2 = schemaNCols(pSchema2);
//     } else {
//       numOfColsOfRow2 = tdRowGetNCols(row2);
//     }
//   }
H
Hongze Cheng 已提交
974

H
Hongze Cheng 已提交
975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104
//   int32_t i = 0, j = 0, k = 0;
//   while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
//     SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);

//     int32_t colIdOfRow1;
//     if (j >= numOfColsOfRow1) {
//       colIdOfRow1 = INT32_MAX;
//     } else if (isRow1DataRow) {
//       colIdOfRow1 = pSchema1->columns[j].colId;
//     } else {
//       colIdOfRow1 = tdKvRowColIdAt(row1, j);
//     }

//     int32_t colIdOfRow2;
//     if (k >= numOfColsOfRow2) {
//       colIdOfRow2 = INT32_MAX;
//     } else if (isRow2DataRow) {
//       colIdOfRow2 = pSchema2->columns[k].colId;
//     } else {
//       colIdOfRow2 = tdKvRowColIdAt(row2, k);
//     }

//     if (colIdOfRow1 < colIdOfRow2) {  // the most probability
//       if (colIdOfRow1 < pColInfo->info.colId) {
//         ++j;
//         continue;
//       }
//       row = row1;
//       pSchema = pSchema1;
//       isChosenRowDataRow = isRow1DataRow;
//       chosen_itr = j;
//     } else if (colIdOfRow1 == colIdOfRow2) {
//       if (colIdOfRow1 < pColInfo->info.colId) {
//         ++j;
//         ++k;
//         continue;
//       }
//       row = row1;
//       pSchema = pSchema1;
//       isChosenRowDataRow = isRow1DataRow;
//       chosen_itr = j;
//     } else {
//       if (colIdOfRow2 < pColInfo->info.colId) {
//         ++k;
//         continue;
//       }
//       row = row2;
//       pSchema = pSchema2;
//       chosen_itr = k;
//       isChosenRowDataRow = isRow2DataRow;
//     }

//     if (isChosenRowDataRow) {
//       colId = pSchema->columns[chosen_itr].colId;
//       offset = pSchema->columns[chosen_itr].offset;
//       // TODO: use STSRowIter
//       tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
//       if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//         rowKey = *(TSKEY*)sVal.val;
//         if (rowKey != *lastRowKey) {
//           mergeOption = 1;
//           if (*lastRowKey != TSKEY_INITIAL_VAL) {
//             ++(*curRow);
//           }
//           *lastRowKey = rowKey;
//           ++nResult;
//         } else if (update) {
//           mergeOption = 2;
//         } else {
//           mergeOption = 0;
//           break;
//         }
//       }
//     } else {
//       // TODO: use STSRowIter
//       if (chosen_itr == 0) {
//         colId = PRIMARYKEY_TIMESTAMP_COL_ID;
//         tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
//         rowKey = *(TSKEY*)sVal.val;
//         if (rowKey != *lastRowKey) {
//           mergeOption = 1;
//           if (*lastRowKey != TSKEY_INITIAL_VAL) {
//             ++(*curRow);
//           }
//           *lastRowKey = rowKey;
//           ++nResult;
//         } else if (update) {
//           mergeOption = 2;
//         } else {
//           mergeOption = 0;
//           break;
//         }
//       } else {
//         SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
//         colId = pColIdx->colId;
//         offset = pColIdx->offset;
//         tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
//       }
//     }

//     ASSERT(rowKey != TSKEY_INITIAL_VAL);

//     if (colId == pColInfo->info.colId) {
//       if (tdValTypeIsNorm(sVal.valType)) {
//         colDataAppend(pColInfo, *curRow, sVal.val, false);
//       } else if (tdValTypeIsNull(sVal.valType)) {
//         colDataAppend(pColInfo, *curRow, NULL, true);
//       } else if (tdValTypeIsNone(sVal.valType)) {
//         // TODO: Set null if nothing append for this row
//         if (mergeOption == 1) {
//           colDataAppend(pColInfo, *curRow, NULL, true);
//         }
//       } else {
//         ASSERT(0);
//       }

//       ++i;

//       if (row == row1) {
//         ++j;
//       } else {
//         ++k;
//       }
//     } else {
//       if (mergeOption == 1) {
//         colDataAppend(pColInfo, *curRow, NULL, true);
//       }
//       ++i;
//     }
//   }
H
Hongze Cheng 已提交
1105

H
Hongze Cheng 已提交
1106 1107 1108 1109 1110 1111 1112
//   if (mergeOption == 1) {
//     while (i < numOfCols) {  // the remain columns are all null data
//       SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//       colDataAppend(pColInfo, *curRow, NULL, true);
//       ++i;
//     }
//   }
H
Hongze Cheng 已提交
1113

H
Hongze Cheng 已提交
1114 1115 1116
//   return nResult;
// #endif
// }
H
Hongze Cheng 已提交
1117

H
Hongze Cheng 已提交
1118 1119
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1120

H
Hongze Cheng 已提交
1121 1122 1123 1124 1125 1126
//   if (cur->rows > 0) {
//     if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
//       assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
//     } else {
//       assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
//     }
H
Hongze Cheng 已提交
1127

H
Hongze Cheng 已提交
1128 1129 1130 1131 1132
//     SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
//     assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
//            cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
//   } else {
//     cur->win = pTsdbReadHandle->window;
H
Hongze Cheng 已提交
1133

H
Hongze Cheng 已提交
1134 1135 1136 1137
//     int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
//     cur->lastKey = pTsdbReadHandle->window.ekey + step;
//   }
// }
H
Hongze Cheng 已提交
1138

H
Haojun Liao 已提交
1139
// static void copyAllRemainRowsFromFileBlock(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo,
H
Hongze Cheng 已提交
1140 1141
//                                            SDataBlockInfo* pBlockInfo, int32_t endPos) {
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1142

H
Hongze Cheng 已提交
1143 1144
//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   TSKEY*     tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
1145

H
Hongze Cheng 已提交
1146
//   bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Hongze Cheng 已提交
1147

H
Hongze Cheng 已提交
1148
//   int32_t step = ascScan ? 1 : -1;
H
Hongze Cheng 已提交
1149

H
Hongze Cheng 已提交
1150 1151
//   int32_t start = cur->pos;
//   int32_t end = endPos;
H
Hongze Cheng 已提交
1152

H
Hongze Cheng 已提交
1153 1154 1155
//   if (!ascScan) {
//     TSWAP(start, end);
//   }
H
Hongze Cheng 已提交
1156

H
Hongze Cheng 已提交
1157 1158
//   assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
//   int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Hongze Cheng 已提交
1159

H
Hongze Cheng 已提交
1160 1161 1162 1163 1164
//   // the time window should always be ascending order: skey <= ekey
//   cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
//   cur->mixBlock = (numOfRows != pBlockInfo->rows);
//   cur->lastKey = tsArray[endPos] + step;
//   cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Hongze Cheng 已提交
1165

H
Hongze Cheng 已提交
1166 1167 1168 1169
//   // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
//   int32_t pos = endPos + step;
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
1170

H
Hongze Cheng 已提交
1171 1172 1173 1174
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
1175

H
Hongze Cheng 已提交
1176 1177
// // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// // be included in the query time window will be discarded
H
Haojun Liao 已提交
1178
// static void doMergeTwoLevelData(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, SBlock* pBlock) {
H
Hongze Cheng 已提交

//   SQueryFilePos* cur = &pTsdbReadHandle->cur;
//   SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
//   STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);

//   initTableMemIterator(pTsdbReadHandle, pCheckInfo);

//   SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
//   assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
//          cur->pos >= 0 && cur->pos < pBlock->numOfRows);
//   // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData
//   interface. TSKEY* tsArray = pCols->cols[0].pData; assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] ==
//   pBlock->minKey.ts &&
//          tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);

//   bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
//   int32_t step = ascScan ? 1 : -1;

//   // for search the endPos, so the order needs to reverse
//   int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;

//   int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
//   int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);

//   STimeWindow* pWin = &blockInfo.window;
//   tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
//             " rows:%d, start:%d, end:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
//             pTsdbReadHandle->idStr);

//   // compared with the data from in-memory buffer, to generate the correct timestamp array list
//   int32_t numOfRows = 0;
//   int32_t curRow = 0;

//   int16_t   rv1 = -1;
//   int16_t   rv2 = -1;
//   STSchema* pSchema1 = NULL;
//   STSchema* pSchema2 = NULL;

//   int32_t pos = cur->pos;
//   cur->win = TSWINDOW_INITIALIZER;
//   bool adjustPos = false;

//   // no data in buffer, load data from file directly
//   if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
//     copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
//     return;
//   } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
//     SSkipListNode* node = NULL;
//     TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;

//     do {
//       STSRow* row2 = NULL;
//       STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
//       if (row1 == NULL) {
//         break;
//       }

//       TSKEY key = TD_ROW_KEY(row1);
//       if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
//         break;
//       }

//       if (adjustPos) {
//         if (key == lastKeyAppend) {
//           pos -= step;
//         }
//         adjustPos = false;
//       }

//       if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
//           ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         break;
//       }

//       if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
//         if (rv1 != TD_ROW_SVER(row1)) {
//           //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
//           rv1 = TD_ROW_SVER(row1);
//         }
//         if (row2 && rv2 != TD_ROW_SVER(row2)) {
//           //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
//           rv2 = TD_ROW_SVER(row2);
//         }

//         numOfRows +=
//             mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = key;
//         }

//         cur->win.ekey = key;
//         cur->lastKey = key + step;
//         cur->mixBlock = true;
//         moveToNextRowInMem(pCheckInfo);
//       } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
//         if (TD_SUPPORT_UPDATE(pCfg->update)) {
//           if (lastKeyAppend != key) {
//             if (lastKeyAppend != TSKEY_INITIAL_VAL) {
//               ++curRow;
//             }
//             lastKeyAppend = key;
//           }
//           // load data from file firstly
//           numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);

//           if (rv1 != TD_ROW_SVER(row1)) {
//             rv1 = TD_ROW_SVER(row1);
//           }
//           if (row2 && rv2 != TD_ROW_SVER(row2)) {
//             rv2 = TD_ROW_SVER(row2);
//           }

//           // still assign data into current row
//           numOfRows +=
//               mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
//                                  pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);

//           if (cur->win.skey == TSKEY_INITIAL_VAL) {
//             cur->win.skey = key;
//           }

//           cur->win.ekey = key;
//           cur->lastKey = key + step;
//           cur->mixBlock = true;

//           moveToNextRowInMem(pCheckInfo);

//           pos += step;
//           adjustPos = true;
//         } else {
//           // discard the memory record
//           moveToNextRowInMem(pCheckInfo);
//         }
//       } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
//         assert(end != -1);

//         if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
// #if 0
//           if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
// #endif
//           if (!TD_SUPPORT_UPDATE(pCfg->update)) {
//             moveToNextRowInMem(pCheckInfo);
//           } else {
//             end -= step;
//           }
//         }

//         int32_t qstart = 0, qend = 0;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);

//         if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
//           ++curRow;
//         }

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
//         pos += (qend - qstart + 1) * step;
//         if (numOfRows > 0) {
//           curRow = numOfRows - 1;
//         }

//         cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
//         cur->lastKey = cur->win.ekey + step;
//         lastKeyAppend = cur->win.ekey;
//       }
//     } while (numOfRows < pTsdbReadHandle->outputCapacity);

//     if (numOfRows < pTsdbReadHandle->outputCapacity) {
//       /**
//        * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
//        * copy them all to result buffer, since it may be overlapped with file data block.
//        */
//       if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan)
//       ||
//           ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
//         // no data in cache or data in cache is greater than the ekey of time window, load data from file block
//         if (cur->win.skey == TSKEY_INITIAL_VAL) {
//           cur->win.skey = tsArray[pos];
//         }

//         int32_t start = -1, end = -1;
//         getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);

//         numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
//         pos += (end - start + 1) * step;

//         cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
//         cur->lastKey = cur->win.ekey + step;
//         cur->mixBlock = true;
//       }
//     }
//   }
H
Hongze Cheng 已提交
1380

H
Hongze Cheng 已提交
1381 1382
//   cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
//                          ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
H
Hongze Cheng 已提交
1383

H
Hongze Cheng 已提交
1384 1385 1386
//   if (!ascScan) {
//     TSWAP(cur->win.skey, cur->win.ekey);
//   }
H
Hongze Cheng 已提交
1387

H
Hongze Cheng 已提交
1388 1389
//   updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
//   doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Hongze Cheng 已提交
1390

H
Hongze Cheng 已提交
1391 1392 1393 1394
//   tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
//             pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
//             pTsdbReadHandle->idStr);
// }
H
Hongze Cheng 已提交
1395

H
Hongze Cheng 已提交
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
// int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
//   int    firstPos, lastPos, midPos = -1;
//   int    numOfRows;
//   TSKEY* keyList;

//   if (num <= 0) return -1;

//   keyList = (TSKEY*)pValue;
//   firstPos = 0;
//   lastPos = num - 1;

//   if (order == TSDB_ORDER_DESC) {
//     // find the first position which is smaller than the key
//     while (1) {
//       if (key >= keyList[lastPos]) return lastPos;
//       if (key == keyList[firstPos]) return firstPos;
//       if (key < keyList[firstPos]) return firstPos - 1;

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }

//   } else {
//     // find the first position which is bigger than the key
//     while (1) {
//       if (key <= keyList[firstPos]) return firstPos;
//       if (key == keyList[lastPos]) return lastPos;

//       if (key > keyList[lastPos]) {
//         lastPos = lastPos + 1;
//         if (lastPos >= num)
//           return -1;
//         else
//           return lastPos;
//       }

//       numOfRows = lastPos - firstPos + 1;
//       midPos = (numOfRows >> 1) + firstPos;

//       if (key < keyList[midPos]) {
//         lastPos = midPos - 1;
//       } else if (key > keyList[midPos]) {
//         firstPos = midPos + 1;
//       } else {
//         break;
//       }
//     }
//   }
H
Hongze Cheng 已提交
1452

H
Hongze Cheng 已提交
1453 1454
//   return midPos;
// }
H
Hongze Cheng 已提交
1455

H
Haojun Liao 已提交
1456 1457 1458
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
  taosMemoryFreeClear(pSup->indexPerTable);
H
Hongze Cheng 已提交
1459

H
Haojun Liao 已提交
1460 1461 1462
  for (int32_t i = 0; i < pSup->numOfTables; ++i) {
    SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
    taosMemoryFreeClear(pBlockInfo);
H
Haojun Liao 已提交
1463
  }
1464

H
Haojun Liao 已提交
1465
  taosMemoryFreeClear(pSup->pDataBlockInfo);
H
Haojun Liao 已提交
1466 1467
}

H
Haojun Liao 已提交
1468 1469
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
  ASSERT(numOfTables >= 1);
H
Haojun Liao 已提交
1470

H
Haojun Liao 已提交
1471
  pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
1472 1473
  pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
H
Haojun Liao 已提交
1474

H
Haojun Liao 已提交
1475 1476 1477
  if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
    cleanupBlockOrderSupporter(pSup);
    return TSDB_CODE_OUT_OF_MEMORY;
1478
  }
1479

H
Haojun Liao 已提交
1480 1481
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
1482

H
Haojun Liao 已提交
1483
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
1484
  int32_t leftIndex = *(int32_t*)pLeft;
H
Haojun Liao 已提交
1485
  int32_t rightIndex = *(int32_t*)pRight;
H
Haojun Liao 已提交
1486

H
Haojun Liao 已提交
1487
  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
H
Haojun Liao 已提交
1488

H
Haojun Liao 已提交
1489 1490
  int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
  int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
1491

H
Haojun Liao 已提交
1492 1493 1494 1495 1496 1497
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
    /* left block is empty */
    return 1;
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
    /* right block is empty */
    return -1;
1498
  }
H
Haojun Liao 已提交
1499

1500
  SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
H
Haojun Liao 已提交
1501
  SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
H
Haojun Liao 已提交
1502

H
Haojun Liao 已提交
1503 1504
  return pLeftBlock->pBlock->aSubBlock[0].offset > pRightBlock->pBlock->aSubBlock[0].offset ? 1 : -1;
}
dengyihao's avatar
dengyihao 已提交
1505

H
Haojun Liao 已提交
1506
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
1507
  bool asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
1508

1509
  pBlockIter->numOfBlocks = numOfBlocks;
1510
  taosArrayClear(pBlockIter->blockList);
H
Haojun Liao 已提交
1511

1512 1513
  // access data blocks according to the offset of each block in asc/desc order.
  int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
1514

1515
  SBlockOrderSupporter sup = {0};
H
Haojun Liao 已提交
1516

1517 1518 1519 1520
  int32_t code = initBlockOrderSupporter(&sup, numOfTables);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1521

1522 1523
  int32_t cnt = 0;
  void*   ptr = NULL;
1524
  while (1) {
1525 1526 1527
    ptr = taosHashIterate(pReader->status.pTableMap, ptr);
    if (ptr == NULL) {
      break;
1528
    }
H
Haojun Liao 已提交
1529

1530 1531 1532 1533
    STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
    if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
      continue;
    }
H
Haojun Liao 已提交
1534

1535 1536
    size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
1537

1538 1539 1540 1541 1542
    char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
    if (buf == NULL) {
      cleanupBlockOrderSupporter(&sup);
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
1543

1544 1545 1546 1547 1548
    sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
    for (int32_t k = 0; k < num; ++k) {
      SBlockOrderWrapper wrapper = {0};
      wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
      wrapper.uid = pTableScanInfo->uid;
1549

1550 1551 1552
      sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
      cnt++;
    }
H
Hongze Cheng 已提交
1553

1554
    sup.numOfTables += 1;
H
Haojun Liao 已提交
1555
  }
1556

1557
  ASSERT(numOfBlocks == cnt);
H
Haojun Liao 已提交
1558

1559 1560 1561 1562 1563
  // since there is only one table qualified, blocks are not sorted
  if (sup.numOfTables == 1) {
    for (int32_t i = 0; i < numOfBlocks; ++i) {
      SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
      taosArrayPush(pBlockIter->blockList, &blockInfo);
1564
    }
1565 1566
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", pReader, cnt,
              pReader->idStr);
1567

1568 1569
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1570
  }
1571

1572 1573
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
            pReader->idStr);
1574

1575
  assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
1576

1577 1578 1579 1580 1581
  SMultiwayMergeTreeInfo* pTree = NULL;
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
  if (ret != TSDB_CODE_SUCCESS) {
    cleanupBlockOrderSupporter(&sup);
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1582
  }
1583

1584 1585 1586 1587
  int32_t numOfTotal = 0;
  while (numOfTotal < cnt) {
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
    int32_t index = sup.indexPerTable[pos]++;
1588

1589 1590
    SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
    taosArrayPush(pBlockIter->blockList, &blockInfo);
H
TD-100  
hzcheng 已提交
1591

1592 1593 1594 1595
    // set data block index overflow, in order to disable the offset comparator
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
    }
1596

1597 1598
    numOfTotal += 1;
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
H
Haojun Liao 已提交
1599
  }
1600

1601 1602 1603
  tsdbDebug("%p %d data blocks sort completed, %s", pReader, cnt, pReader->idStr);
  cleanupBlockOrderSupporter(&sup);
  taosMemoryFree(pTree);
1604

1605 1606
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1607
}
1608

H
Haojun Liao 已提交
1609
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
1610
  bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
1611

1612
  int32_t step = asc ? 1 : -1;
1613
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
1614
    return false;
1615
  }
1616

1617
  pBlockIter->index += step;
1618
  return true;
1619 1620
}

1621 1622 1623
/**
 * This is an two rectangles overlap cases.
 */
1624
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
1625 1626 1627 1628
  return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
         (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
         (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
         (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
H
Haojun Liao 已提交
1629
}
H
Haojun Liao 已提交
1630

H
Haojun Liao 已提交
1631 1632 1633 1634
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
  SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
  return pFBlockInfo;
}
H
Haojun Liao 已提交
1635

1636 1637
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
                                           int32_t* nextIndex, int32_t order) {
1638 1639 1640
  bool asc = ASCENDING_TRAVERSE(order);
  if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
    return NULL;
H
Haojun Liao 已提交
1641
  }
1642

1643
  if (!asc && pFBlockInfo->tbBlockIdx == 0) {
1644
    return NULL;
H
Haojun Liao 已提交
1645
  }
1646

1647
  int32_t step = asc ? 1 : -1;
1648

1649 1650 1651 1652
  *nextIndex = pFBlockInfo->tbBlockIdx + step;
  SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
  return pNext;
}
1653

1654 1655
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
  ASSERT(pBlockIter != NULL && pFBlockInfo != NULL);
1656

1657
  int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
1658
  int32_t index = pBlockIter->index;
H
Haojun Liao 已提交
1659

1660
  while (index < pBlockIter->numOfBlocks && index >= 0) {
1661 1662 1663
    SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
    if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
      return index;
1664
    }
H
Haojun Liao 已提交
1665

1666 1667
    index += step;
  }
H
Haojun Liao 已提交
1668

1669 1670
  ASSERT(0);
  return -1;
H
hjxilinx 已提交
1671 1672
}

1673
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
1674 1675 1676
  if (index < 0 || index >= pBlockIter->numOfBlocks) {
    return -1;
  }
1677

1678
  SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
1679
  pBlockIter->index += step;
H
Haojun Liao 已提交
1680

1681 1682 1683
  if (index != pBlockIter->index) {
    taosArrayRemove(pBlockIter->blockList, index);
    taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
1684

1685 1686
    SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
    ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
H
Haojun Liao 已提交
1687
  }
H
Haojun Liao 已提交
1688

1689 1690
  return TSDB_CODE_SUCCESS;
}
1691

1692 1693 1694 1695 1696 1697 1698
static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
  // it is the last block in current file, no chance to overlap with neighbor blocks.
  if (ASCENDING_TRAVERSE(order)) {
    return pBlock->maxKey.ts == pNeighbor->minKey.ts;
  } else {
    return pBlock->minKey.ts == pNeighbor->maxKey.ts;
  }
H
Haojun Liao 已提交
1699
}
H
Haojun Liao 已提交
1700

1701
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
H
Haojun Liao 已提交
1702
  bool ascScan = ASCENDING_TRAVERSE(order);
H
Haojun Liao 已提交
1703

1704
  return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
1705
         (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
H
Haojun Liao 已提交
1706
}
1707

H
Haojun Liao 已提交
1708
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
1709 1710
  return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
         (pBlock->minVersion <= pVerRange->maxVer);
H
Haojun Liao 已提交
1711
}
H
Haojun Liao 已提交
1712

1713 1714 1715 1716
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
1717 1718
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
                                STableBlockScanInfo* pScanInfo, TSDBKEY key) {
1719 1720
  int32_t neighborIndex = 0;
  SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
H
Haojun Liao 已提交
1721

1722 1723 1724 1725
  bool overlapWithNeighbor = false;
  if (pNeighbor) {
    overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
  }
1726

1727 1728 1729
  bool hasDup = false;
  if (pBlock->nSubBlock == 1) {
    hasDup = pBlock->hasDup;
1730
  } else {
1731
    hasDup = true;
1732
  }
H
Haojun Liao 已提交
1733

1734 1735
  return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
          keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity));
1736 1737
}

1738
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
1739
  if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
1740 1741
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1742

1743
  SSDataBlock* pBlock = pReader->pResBlock;
H
Haojun Liao 已提交
1744

1745
  int64_t st = taosGetTimestampUs();
1746
  int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
H
Haojun Liao 已提交
1747

1748
  blockDataUpdateTsWindow(pBlock, 0);
1749
  pBlock->info.uid = pBlockScanInfo->uid;
H
Haojun Liao 已提交
1750

1751
  setComposedBlockFlag(pReader, true);
1752

1753 1754 1755 1756 1757
  int64_t elapsedTime = taosGetTimestampUs() - st;
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
            " us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
            pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
            pBlock->info.window.ekey, pReader->idStr);
H
Haojun Liao 已提交
1758
  return code;
H
[td-32]  
hjxilinx 已提交
1759 1760
}

1761
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
1762
                                     STSRow* pTSRow, SIterInfo* pIter, int64_t key) {
1763 1764 1765
  SRowMerger          merge = {0};
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
H
Haojun Liao 已提交
1766

1767
  TSDBKEY k = TSDBROW_KEY(pRow);
1768
  TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1769
  SArray* pDelList = pBlockScanInfo->delSkyline;
1770

1771 1772 1773 1774
  // ascending order traverse
  if (ASCENDING_TRAVERSE(pReader->order)) {
    if (key < k.ts) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1775

1776 1777 1778
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else if (k.ts < key) {  // k.ts < key
1779
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1780 1781 1782
    } else {  // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1783

1784
      tRowMerge(&merge, pRow);
1785
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1786

1787
      tRowMergerGetRow(&merge, &pTSRow);
1788
    }
1789 1790
  } else {  // descending order scan
    if (key < k.ts) {
1791
      doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
1792 1793
    } else if (k.ts < key) {
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1794

1795 1796 1797 1798
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
      tRowMergerGetRow(&merge, &pTSRow);
    } else {  // descending order: mem rows -----> imem rows ------> file block
      updateSchema(pRow, pBlockScanInfo->uid, pReader);
1799

1800
      tRowMergerInit(&merge, pRow, pReader->pSchema);
1801
      doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
1802

1803 1804 1805 1806
      tRowMerge(&merge, &fRow);
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);

      tRowMergerGetRow(&merge, &pTSRow);
1807 1808
    }
  }
1809

1810
  tRowMergerClear(&merge);
1811 1812
  doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
  return TSDB_CODE_SUCCESS;
1813 1814
}

1815 1816 1817
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;
H
Haojun Liao 已提交
1818

1819 1820
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
dengyihao's avatar
dengyihao 已提交
1821
  SArray*             pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
1822

1823 1824
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
1825
  ASSERT(pRow != NULL && piRow != NULL);
1826

1827
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
H
Haojun Liao 已提交
1828

1829
  uint64_t uid = pBlockScanInfo->uid;
1830

1831 1832
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
1833

1834
  if (ASCENDING_TRAVERSE(pReader->order)) {
1835 1836
    // [1&2] key <= [k.ts && ik.ts]
    if (key <= k.ts && key <= ik.ts) {
1837 1838
      TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
      tRowMergerInit(&merge, &fRow, pReader->pSchema);
1839

1840
      doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
1841

1842 1843
      if (ik.ts == key) {
        tRowMerge(&merge, piRow);
1844
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1845
      }
1846

1847 1848
      if (k.ts == key) {
        tRowMerge(&merge, pRow);
1849
        doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1850
      }
C
Cary Xu 已提交
1851

1852 1853
      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1854
      return TSDB_CODE_SUCCESS;
1855
    } else {  // key > ik.ts || key > k.ts
1856 1857
      ASSERT(key != ik.ts);

1858
      // [3] ik.ts < key <= k.ts
1859
      // [4] ik.ts < k.ts <= key
1860
      if (ik.ts < k.ts) {
1861
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1862 1863 1864
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
1865

1866 1867
      // [5] k.ts < key   <= ik.ts
      // [6] k.ts < ik.ts <= key
1868
      if (k.ts < ik.ts) {
1869
        doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
1870 1871 1872
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
C
Cary Xu 已提交
1873

1874
      // [7] k.ts == ik.ts < key
1875
      if (k.ts == ik.ts) {
1876
        ASSERT(key > ik.ts && key > k.ts);
1877

1878
        doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
1879 1880 1881
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
      }
1882
    }
1883 1884 1885 1886
  } else {  // descending order scan
    // [1/2] k.ts >= ik.ts && k.ts >= key
    if (k.ts >= ik.ts && k.ts >= key) {
      updateSchema(pRow, uid, pReader);
1887

1888
      tRowMergerInit(&merge, pRow, pReader->pSchema);
1889
      doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
1890

1891 1892
      if (ik.ts == k.ts) {
        tRowMerge(&merge, piRow);
1893
        doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
C
Cary Xu 已提交
1894
      }
1895 1896 1897 1898 1899

      if (k.ts == key) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
C
Cary Xu 已提交
1900
      }
1901 1902 1903 1904

      tRowMergerGetRow(&merge, &pTSRow);
      doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
      return TSDB_CODE_SUCCESS;
1905
    } else {
1906
      ASSERT(ik.ts != k.ts);  // this case has been included in the previous if branch
1907 1908 1909 1910

      // [3] ik.ts > k.ts >= Key
      // [4] ik.ts > key >= k.ts
      if (ik.ts > key) {
1911
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
1912 1913
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1914
      }
1915

1916 1917 1918 1919 1920
      // [5] key > ik.ts > k.ts
      // [6] key > k.ts > ik.ts
      if (key > ik.ts) {
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMergerInit(&merge, &fRow, pReader->pSchema);
C
Cary Xu 已提交
1921

1922 1923 1924 1925
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
1926
      }
H
Haojun Liao 已提交
1927

1928 1929
      //[7] key = ik.ts > k.ts
      if (key == ik.ts) {
1930
        doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
C
Cary Xu 已提交
1931

1932 1933 1934 1935 1936 1937
        TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
        tRowMerge(&merge, &fRow);
        doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
        tRowMergerGetRow(&merge, &pTSRow);
        doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
        return TSDB_CODE_SUCCESS;
1938
      }
1939 1940
    }
  }
C
Cary Xu 已提交
1941

1942
  ASSERT(0);
1943
}
1944

dengyihao's avatar
dengyihao 已提交
1945 1946
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
                                STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
1947 1948 1949 1950
  // check for version and time range
  int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
  if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
    return false;
1951
  }
H
Haojun Liao 已提交
1952

1953 1954 1955
  int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
  if (ts > pReader->window.ekey || ts < pReader->window.skey) {
    return false;
H
Haojun Liao 已提交
1956
  }
H
Haojun Liao 已提交
1957

1958 1959 1960
  TSDBKEY k = {.ts = ts, .version = ver};
  if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k)) {
    return false;
H
Haojun Liao 已提交
1961 1962
  }

1963 1964
  return true;
}
H
Haojun Liao 已提交
1965

1966
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
H
Haojun Liao 已提交
1967

1968 1969 1970
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
H
Haojun Liao 已提交
1971

1972 1973
  SRowMerger merge = {0};
  STSRow*    pTSRow = NULL;
H
Haojun Liao 已提交
1974

1975
  int64_t  key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
1976 1977
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
H
Haojun Liao 已提交
1978

1979
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
1980
    return doMergeThreeLevelRows(pReader, pBlockScanInfo);
H
Haojun Liao 已提交
1981
  } else {
1982
    // imem + file
1983 1984
    if (pBlockScanInfo->iiter.hasVal) {
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key);
1985
    }
H
TD-1439  
Hongze Cheng 已提交
1986

1987
    // mem + file
1988
    if (pBlockScanInfo->iter.hasVal) {
dengyihao's avatar
dengyihao 已提交
1989
      return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter, key);
H
Haojun Liao 已提交
1990
    }
C
Cary Xu 已提交
1991

1992
    // imem & mem are all empty, only file exist
1993
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
1994 1995 1996 1997
    tRowMergerInit(&merge, &fRow, pReader->pSchema);
    doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
    tRowMergerGetRow(&merge, &pTSRow);
    doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
1998

1999
    return TSDB_CODE_SUCCESS;
2000 2001
  }
}
2002

2003
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
2004
  SSDataBlock* pResBlock = pReader->pResBlock;
2005

2006
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2007 2008
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
  int32_t             step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
2009

2010
  while (1) {
2011 2012
    // todo check the validate of row in file block
    {
2013
      if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
2014
        pDumpInfo->rowIndex += step;
2015

2016 2017
        SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
        SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
2018

2019 2020 2021
        if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
          setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
          break;
C
Cary Xu 已提交
2022
        }
2023

2024
        continue;
2025
      }
2026
    }
2027

2028
    buildComposedDataBlockImpl(pReader, pBlockScanInfo);
2029

2030 2031
    SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
    SBlock*             pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
2032

2033 2034 2035 2036 2037 2038 2039 2040
    // currently loaded file data block is consumed
    if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
      setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
      break;
    }

    if (pResBlock->info.rows >= pReader->capacity) {
      break;
2041 2042
    }
  }
H
Haojun Liao 已提交
2043

2044
  pResBlock->info.uid = pBlockScanInfo->uid;
2045
  blockDataUpdateTsWindow(pResBlock, 0);
2046

2047
  setComposedBlockFlag(pReader, true);
2048

2049 2050
  tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", pReader,
            pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
2051
            pReader->idStr);
H
Haojun Liao 已提交
2052

2053
  return TSDB_CODE_SUCCESS;
2054 2055
}

2056
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
2057

2058
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
2059 2060 2061
  if (pBlockScanInfo->iterInit) {
    return TSDB_CODE_SUCCESS;
  }
2062

2063
  int32_t code = TSDB_CODE_SUCCESS;
2064

2065 2066 2067 2068 2069 2070 2071 2072
  TSDBKEY startKey = {0};
  if (ASCENDING_TRAVERSE(pReader->order)) {
    startKey = (TSDBKEY){.ts = pReader->window.skey, .version = pReader->verRange.minVer};
  } else {
    startKey = (TSDBKEY){.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
  }

  int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
2073

2074 2075 2076
  STbData* d = NULL;
  if (pReader->pTsdb->mem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
2077
    if (d != NULL) {
2078
      code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
2079
      if (code == TSDB_CODE_SUCCESS) {
2080
        pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
2081

H
Haojun Liao 已提交
2082
        tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2083 2084
                  "-%" PRId64 " %s",
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
H
[td-32]  
hjxilinx 已提交
2085
      } else {
2086 2087
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2088
        return code;
H
[td-32]  
hjxilinx 已提交
2089 2090 2091
      }
    }
  } else {
2092
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
Haojun Liao 已提交
2093
  }
2094

2095 2096 2097
  STbData* di = NULL;
  if (pReader->pTsdb->imem != NULL) {
    tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
2098
    if (di != NULL) {
2099
      code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
2100
      if (code == TSDB_CODE_SUCCESS) {
2101
        pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
2102

H
Haojun Liao 已提交
2103
        tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
2104
                  "-%" PRId64 " %s",
2105
                  pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
H
[td-32]  
hjxilinx 已提交
2106
      } else {
2107 2108
        tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
                  tstrerror(code), pReader->idStr);
2109
        return code;
H
[td-32]  
hjxilinx 已提交
2110 2111
      }
    }
H
Haojun Liao 已提交
2112 2113
  } else {
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
H
[td-32]  
hjxilinx 已提交
2114
  }
2115

2116
  initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
H
[td-32]  
hjxilinx 已提交
2117

2118
  pBlockScanInfo->iterInit = true;
H
Haojun Liao 已提交
2119 2120
  return TSDB_CODE_SUCCESS;
}
2121

dengyihao's avatar
dengyihao 已提交
2122 2123
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
                               STbData* piMemTbData) {
2124 2125
  if (pBlockScanInfo->delSkyline != NULL) {
    return TSDB_CODE_SUCCESS;
2126 2127
  }

2128 2129
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;
2130

2131
  SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
2132

dengyihao's avatar
dengyihao 已提交
2133
  SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
2134 2135 2136 2137 2138 2139
  if (pDelFile) {
    SDelFReader* pDelFReader = NULL;
    code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
    if (code) {
      goto _err;
    }
2140

2141 2142 2143 2144
    SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
    if (aDelIdx == NULL) {
      goto _err;
    }
2145

2146 2147 2148 2149
    code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
    if (code) {
      goto _err;
    }
2150

2151 2152
    SDelIdx  idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
    SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);
2153

2154 2155 2156
    code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
    if (code != TSDB_CODE_SUCCESS) {
      goto _err;
2157
    }
2158 2159
  }

2160 2161 2162 2163 2164 2165 2166
  SDelData* p = NULL;
  if (pMemTbData != NULL) {
    p = pMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
    }
2167
  }
H
Haojun Liao 已提交
2168

2169 2170 2171 2172 2173
  if (piMemTbData != NULL) {
    p = piMemTbData->pHead;
    while (p) {
      taosArrayPush(pDelData, p);
      p = p->pNext;
H
Haojun Liao 已提交
2174
    }
2175
  }
H
Haojun Liao 已提交
2176

2177 2178 2179
  if (taosArrayGetSize(pDelData) > 0) {
    pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
    code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
2180 2181
  }

2182
  taosArrayDestroy(pDelData);
dengyihao's avatar
dengyihao 已提交
2183 2184
  pBlockScanInfo->iter.index =
      ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
2185 2186
  pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
  pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
2187
  return code;
2188

2189 2190 2191
_err:
  taosArrayDestroy(pDelData);
  return code;
2192
}
2193

2194 2195
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
  TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
2196

2197
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2198
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
H
Haojun Liao 已提交
2199

2200 2201
  initMemDataIterator(pScanInfo, pReader);
  TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
2202
  if (pRow != NULL) {
2203 2204
    key = TSDBROW_KEY(pRow);
  }
H
Haojun Liao 已提交
2205

2206
  pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
2207
  if (pRow != NULL) {
2208 2209 2210
    TSDBKEY k = TSDBROW_KEY(pRow);
    if (key.ts > k.ts) {
      key = k;
2211
    }
2212
  }
H
Haojun Liao 已提交
2213

2214 2215
  return key;
}
2216

H
Haojun Liao 已提交
2217 2218 2219 2220
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
  SReaderStatus* pStatus = &pReader->status;

  while (1) {
2221
    bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
2222
    if (!hasNext) {  // no data files on disk
H
Haojun Liao 已提交
2223
      break;
2224 2225
    }

H
Haojun Liao 已提交
2226 2227 2228 2229 2230
    SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
    int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
2231

H
Haojun Liao 已提交
2232 2233 2234 2235 2236 2237
    if (taosArrayGetSize(pIndexList) > 0) {
      uint32_t numOfValidTable = 0;
      code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
2238

H
Haojun Liao 已提交
2239 2240 2241
      if (numOfValidTable > 0) {
        break;
      }
2242 2243
    }

H
Haojun Liao 已提交
2244
    // no blocks in current file, try next files
2245 2246
  }

H
Haojun Liao 已提交
2247 2248
  return TSDB_CODE_SUCCESS;
}
2249

2250 2251
static int32_t doBuildDataBlock(STsdbReader* pReader) {
  int32_t code = TSDB_CODE_SUCCESS;
2252

2253
  SReaderStatus*  pStatus = &pReader->status;
2254
  SDataBlockIter* pBlockIter = &pStatus->blockIter;
2255

2256 2257
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
  STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2258

2259
  SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
2260

2261 2262
  TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
  if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
2263 2264
    tBlockDataInit(&pStatus->fileBlockData);
    code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
2265 2266 2267 2268 2269
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // build composed data block
2270
    code = buildComposedDataBlock(pReader, pScanInfo);
2271 2272
  } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
    // data in memory that are earlier than current file block
2273
    // todo rows in buffer should be less than the file block in asc, greater than file block in desc
2274
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
2275
    code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
2276
  } else {  // whole block is required, return it directly
2277
    SDataBlockInfo* pInfo = &pReader->pResBlock->info;
2278 2279 2280
    pInfo->rows = pBlock->nRow;
    pInfo->uid = pScanInfo->uid;
    pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
2281
    setComposedBlockFlag(pReader, false);
2282
    setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
2283 2284
  }

2285 2286
  return code;
}
2287

H
Haojun Liao 已提交
2288
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
2289 2290
  SReaderStatus* pStatus = &pReader->status;

2291
  while (1) {
2292 2293 2294
    if (pStatus->pTableIter == NULL) {
      pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
      if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2295
        return TSDB_CODE_SUCCESS;
2296 2297
      }
    }
2298

2299
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
2300
    initMemDataIterator(pBlockScanInfo, pReader);
2301

2302
    int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
2303
    int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
H
Haojun Liao 已提交
2304 2305
    if (code != TSDB_CODE_SUCCESS) {
      return code;
2306
    }
2307

2308
    if (pReader->pResBlock->info.rows > 0) {
H
Haojun Liao 已提交
2309
      return TSDB_CODE_SUCCESS;
2310 2311 2312 2313 2314
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
    if (pStatus->pTableIter == NULL) {
H
Haojun Liao 已提交
2315
      return TSDB_CODE_SUCCESS;
2316
    }
2317
  }
2318
}
2319

2320
// set the correct start position in case of the first/last file block, according to the query time window
2321
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2322
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(pBlockIter);
2323
  STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
2324
  SBlock*              pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
2325

2326
  SReaderStatus* pStatus = &pReader->status;
2327

2328
  SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
2329 2330 2331

  pDumpInfo->totalRows = pBlock->nRow;
  pDumpInfo->allDumped = false;
2332
  pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
2333 2334
}

2335
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
2336 2337 2338 2339 2340
  int32_t numOfBlocks = 0;
  int32_t code = moveToNextFile(pReader, &numOfBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
2341

2342 2343 2344 2345 2346
  // all data files are consumed, try data in buffer
  if (numOfBlocks == 0) {
    pReader->status.loadFromFile = false;
    return code;
  }
H
Haojun Liao 已提交
2347

2348 2349
  // initialize the block iterator for a new fileset
  code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
H
Haojun Liao 已提交
2350

2351
  // set the correct start position according to the query time window
2352
  initBlockDumpInfo(pReader, pBlockIter);
2353
  return code;
H
Haojun Liao 已提交
2354 2355
}

2356
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
2357 2358
  return (!pDumpInfo->allDumped) &&
         ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
2359
}
H
Haojun Liao 已提交
2360

2361
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
H
Haojun Liao 已提交
2362
  int32_t code = TSDB_CODE_SUCCESS;
2363
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
H
Haojun Liao 已提交
2364

2365
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;
2366

2367
  while (1) {
2368
    SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
2369
    STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
H
Hongze Cheng 已提交
2370

2371
    SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2372

2373
    if (fileBlockPartiallyRead(pDumpInfo, asc)) {  // file data block is partially loaded
2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388
      code = buildComposedDataBlock(pReader, pScanInfo);
    } else {
      // current block are exhausted, try the next file block
      if (pDumpInfo->allDumped) {
        // try next data block in current file
        bool hasNext = blockIteratorNext(&pReader->status.blockIter);
        if (hasNext) {  // check for the next block in the block accessed order list
          initBlockDumpInfo(pReader, pBlockIter);
        } else {  // data blocks in current file are exhausted, let's try the next file now
          code = initForFirstBlockInFile(pReader, pBlockIter);

          // error happens or all the data files are completely checked
          if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
            return code;
          }
2389
        }
H
Haojun Liao 已提交
2390
      }
2391

2392 2393
      // current block is not loaded yet, or data in buffer may overlap with the file block.
      code = doBuildDataBlock(pReader);
H
Hongze Cheng 已提交
2394 2395
    }

2396 2397
    if (code != TSDB_CODE_SUCCESS) {
      return code;
H
Hongze Cheng 已提交
2398 2399
    }

2400 2401
    if (pReader->pResBlock->info.rows > 0) {
      return TSDB_CODE_SUCCESS;
2402
    }
2403
  }
2404
}
H
Haojun Liao 已提交
2405

2406 2407
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
                                  int8_t* pLevel) {
2408
  if (VND_IS_RSMA(pVnode)) {
2409
    int8_t  level = 0;
2410
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
H
Haojun Liao 已提交
2411

2412
    for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
        break;
      }
      if ((now - pRetention->keep) <= winSKey) {
        break;
      }
      ++level;
2424
    }
H
Haojun Liao 已提交
2425

2426 2427
    int32_t     vgId = TD_VID(pVnode);
    const char* str = (idStr != NULL) ? idStr : "";
H
Haojun Liao 已提交
2428

2429
    if (level == TSDB_RETENTION_L0) {
2430
      *pLevel = TSDB_RETENTION_L0;
2431 2432 2433
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
2434
      *pLevel = TSDB_RETENTION_L1;
2435 2436 2437
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
      return VND_RSMA1(pVnode);
    } else {
2438
      *pLevel = TSDB_RETENTION_L2;
2439 2440
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
      return VND_RSMA2(pVnode);
2441 2442
    }
  }
H
Haojun Liao 已提交
2443

2444 2445
  return VND_TSDB(pVnode);
}
H
Haojun Liao 已提交
2446

2447 2448 2449
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
  if (VND_IS_RSMA(pVnode)) {
    return (SVersionRange){.minVer = pCond->startVersion, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
2450
  }
H
Haojun Liao 已提交
2451

2452
  return (SVersionRange){.minVer = pCond->startVersion, .maxVer = pVnode->state.applied};
H
Haojun Liao 已提交
2453 2454
}

H
Hongze Cheng 已提交
2455 2456 2457 2458
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
//   // filter the queried time stamp in the first place
//   STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
H
Haojun Liao 已提交
2459

H
Hongze Cheng 已提交
2460 2461
//   // starts from the buffer in case of descending timestamp order check data blocks
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2462

H
Hongze Cheng 已提交
2463 2464
//   int32_t i = 0;
//   while (i < numOfTables) {
H
Haojun Liao 已提交
2465
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2466

H
Hongze Cheng 已提交
2467 2468 2469 2470 2471
//     // the first qualified table for interpolation query
//     //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
//     //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
//     //      break;
//     //    }
H
Haojun Liao 已提交
2472

H
Hongze Cheng 已提交
2473 2474
//     i++;
//   }
2475

H
Hongze Cheng 已提交
2476 2477 2478 2479
//   // there are no data in all the tables
//   if (i == numOfTables) {
//     return;
//   }
H
Haojun Liao 已提交
2480

H
Haojun Liao 已提交
2481
//   STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Hongze Cheng 已提交
2482
//   taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2483

H
Hongze Cheng 已提交
2484 2485 2486
//   info.lastKey = pTsdbReadHandle->window.skey;
//   taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }
H
Haojun Liao 已提交
2487

2488 2489 2490 2491 2492
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey) {
  ASSERT(pKey != NULL);
  if (pDelList == NULL) {
    return false;
  }
H
Haojun Liao 已提交
2493

2494 2495 2496 2497 2498
  if (*index >= taosArrayGetSize(pDelList) - 1) {
    TSDBKEY* last = taosArrayGetLast(pDelList);
    if (pKey->ts > last->ts) {
      return false;
    } else if (pKey->ts == last->ts) {
dengyihao's avatar
dengyihao 已提交
2499
      size_t   size = taosArrayGetSize(pDelList);
2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511
      TSDBKEY* prev = taosArrayGet(pDelList, size - 2);
      if (prev->version >= pKey->version) {
        return true;
      } else {
        return false;
      }
    } else {
      ASSERT(0);
    }
  } else {
    TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
    TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
H
Haojun Liao 已提交
2512

2513 2514 2515 2516 2517 2518
    if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
      return true;
    } else {
      while (pNext->ts < pKey->ts && (*index) < taosArrayGetSize(pDelList) - 1) {
        (*index) += 1;
      }
2519

2520 2521 2522 2523
      return false;
    }
  }
}
2524

2525 2526
TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
  if (!pIter->hasVal) {
H
Haojun Liao 已提交
2527 2528
    return NULL;
  }
H
Haojun Liao 已提交
2529

2530
  TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
2531
  TSDBKEY  key = TSDBROW_KEY(pRow);
2532
  if (outOfTimeWindow(key.ts, &pReader->window)) {
2533
    pIter->hasVal = false;
H
Haojun Liao 已提交
2534 2535
    return NULL;
  }
H
Haojun Liao 已提交
2536

2537
  // it is a valid data version
dengyihao's avatar
dengyihao 已提交
2538 2539
  if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
      (!hasBeenDropped(pDelList, &pIter->index, &key))) {
H
Haojun Liao 已提交
2540 2541
    return pRow;
  }
H
Haojun Liao 已提交
2542

2543
  while (1) {
2544 2545
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2546
      return NULL;
H
Haojun Liao 已提交
2547 2548
    }

2549
    pRow = tsdbTbDataIterGet(pIter->iter);
H
Haojun Liao 已提交
2550

H
Haojun Liao 已提交
2551
    key = TSDBROW_KEY(pRow);
2552
    if (outOfTimeWindow(key.ts, &pReader->window)) {
2553
      pIter->hasVal = false;
H
Haojun Liao 已提交
2554
      return NULL;
H
Haojun Liao 已提交
2555 2556
    }

dengyihao's avatar
dengyihao 已提交
2557 2558
    if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
        (!hasBeenDropped(pDelList, &pIter->index, &key))) {
H
Haojun Liao 已提交
2559
      return pRow;
H
Haojun Liao 已提交
2560
    }
H
Haojun Liao 已提交
2561 2562
  }
}
H
Haojun Liao 已提交
2563

dengyihao's avatar
dengyihao 已提交
2564
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
H
Haojun Liao 已提交
2565
  while (1) {
2566 2567
    pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
    if (!pIter->hasVal) {
H
Haojun Liao 已提交
2568 2569
      break;
    }
H
Haojun Liao 已提交
2570

2571
    // data exists but not valid
2572
    TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
2573
    if (pRow == NULL) {
H
Haojun Liao 已提交
2574 2575 2576
      break;
    }

2577
    // ts is not identical, quit
H
Haojun Liao 已提交
2578
    TSDBKEY k = TSDBROW_KEY(pRow);
2579
    if (k.ts != ts) {
H
Haojun Liao 已提交
2580 2581 2582
      break;
    }

H
Haojun Liao 已提交
2583 2584 2585 2586 2587
    tRowMerge(pMerger, pRow);
  }

  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2588

2589
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
2590
                                          SVersionRange* pVerRange, int32_t step) {
2591 2592
  while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
    if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
H
Haojun Liao 已提交
2593 2594 2595
      continue;
    }

2596 2597 2598 2599 2600 2601 2602
    TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
    tRowMerge(pMerger, &fRow);
    rowIndex += step;
  }

  return rowIndex;
}
2603

2604 2605 2606 2607
typedef enum {
  CHECK_FILEBLOCK_CONT = 0x1,
  CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;
H
Haojun Liao 已提交
2608

2609
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
2610 2611
                                         SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
                                         CHECK_FILEBLOCK_STATE* state) {
2612
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
2613
  SBlockData*         pBlockData = &pReader->status.fileBlockData;
2614

2615
  *state = CHECK_FILEBLOCK_QUIT;
2616
  int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
H
Haojun Liao 已提交
2617

2618 2619
  int32_t nextIndex = -1;
  SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
2620
  if (pNeighborBlock == NULL) {  // do nothing
2621 2622
    return 0;
  }
2623

2624 2625
  bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order);
  if (overlap) {  // load next block
2626
    SReaderStatus*  pStatus = &pReader->status;
2627
    SDataBlockIter* pBlockIter = &pStatus->blockIter;
H
Haojun Liao 已提交
2628

2629
    // 1. find the next neighbor block in the scan block list
2630
    SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
2631
    int32_t            neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
H
Haojun Liao 已提交
2632

2633
    // 2. remove it from the scan block list
2634
    setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
2635

2636
    // 3. load the neighbor block, and set it to be the currently accessed file data block
2637 2638 2639 2640 2641
    int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

2642
    // 4. check the data values
2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655
    initBlockDumpInfo(pReader, pBlockIter);

    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);

    if (pDumpInfo->rowIndex >= pBlock->nRow) {
      *state = CHECK_FILEBLOCK_CONT;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2656 2657
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
                                SRowMerger* pMerger) {
2658 2659
  SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;

2660
  bool    asc = ASCENDING_TRAVERSE(pReader->order);
2661
  int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
2662
  int32_t step = asc ? 1 : -1;
2663

2664 2665 2666 2667 2668
  pDumpInfo->rowIndex += step;
  if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) {
    pDumpInfo->rowIndex =
        doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
  }
2669

2670 2671 2672 2673
  // all rows are consumed, let's try next file block
  if ((pDumpInfo->rowIndex >= pBlockData->nRow && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
    while (1) {
      CHECK_FILEBLOCK_STATE st;
2674

2675 2676 2677 2678 2679
      SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
      SBlock*             pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
      checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
      if (st == CHECK_FILEBLOCK_QUIT) {
        break;
H
Haojun Liao 已提交
2680 2681 2682 2683
      }
    }
  }

H
Haojun Liao 已提交
2684 2685 2686
  return TSDB_CODE_SUCCESS;
}

2687
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
2688 2689 2690 2691 2692 2693 2694 2695 2696 2697
  int32_t sversion = TSDBROW_SVERSION(pRow);

  if (pReader->pSchema == NULL) {
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
  } else if (pReader->pSchema->version != sversion) {
    taosMemoryFreeClear(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
  }
}

dengyihao's avatar
dengyihao 已提交
2698 2699
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
                      STsdbReader* pReader) {
2700 2701 2702
  SRowMerger merge = {0};

  TSDBKEY k = TSDBROW_KEY(pRow);
2703
  updateSchema(pRow, uid, pReader);
H
Haojun Liao 已提交
2704

2705
  tRowMergerInit(&merge, pRow, pReader->pSchema);
2706
  doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
2707
  tRowMergerGetRow(&merge, pTSRow);
H
Haojun Liao 已提交
2708 2709
}

2710 2711
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
                        STSRow** pTSRow) {
H
Haojun Liao 已提交
2712 2713
  SRowMerger merge = {0};

2714 2715
  TSDBKEY k = TSDBROW_KEY(pRow);
  TSDBKEY ik = TSDBROW_KEY(piRow);
2716

2717 2718
  if (ASCENDING_TRAVERSE(pReader->order)) {  // ascending order imem --> mem
    updateSchema(piRow, pBlockScanInfo->uid, pReader);
H
Haojun Liao 已提交
2719

2720
    tRowMergerInit(&merge, piRow, pReader->pSchema);
2721
    doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2722

2723
    tRowMerge(&merge, pRow);
2724
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2725
  } else {
2726
    updateSchema(pRow, pBlockScanInfo->uid, pReader);
2727

2728
    tRowMergerInit(&merge, pRow, pReader->pSchema);
2729
    doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2730 2731

    tRowMerge(&merge, piRow);
2732
    doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
2733
  }
2734 2735 2736

  tRowMergerGetRow(&merge, pTSRow);
}
H
Haojun Liao 已提交
2737

2738 2739
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
                            int64_t endKey) {
2740 2741
  TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
  TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
dengyihao's avatar
dengyihao 已提交
2742
  SArray*  pDelList = pBlockScanInfo->delSkyline;
H
Haojun Liao 已提交
2743

2744 2745
  // todo refactor
  bool asc = ASCENDING_TRAVERSE(pReader->order);
2746
  if (pBlockScanInfo->iter.hasVal) {
2747 2748 2749
    TSDBKEY k = TSDBROW_KEY(pRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      pRow = NULL;
H
Haojun Liao 已提交
2750
    }
2751
  }
H
Haojun Liao 已提交
2752

2753
  if (pBlockScanInfo->iiter.hasVal) {
2754 2755 2756
    TSDBKEY k = TSDBROW_KEY(piRow);
    if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
      piRow = NULL;
2757 2758
    }
  }
2759

2760
  if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
2761
    TSDBKEY k = TSDBROW_KEY(pRow);
2762
    TSDBKEY ik = TSDBROW_KEY(piRow);
dengyihao's avatar
dengyihao 已提交
2763

2764
    if (ik.ts < k.ts) {  // ik.ts < k.ts
2765
      doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
2766
    } else if (k.ts < ik.ts) {
2767
      doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
2768 2769
    } else {  // ik.ts == k.ts
      doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
2770
    }
H
Haojun Liao 已提交
2771

2772
    return TSDB_CODE_SUCCESS;
2773
  }
H
Haojun Liao 已提交
2774

2775 2776
  if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
    doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2777 2778 2779
    return TSDB_CODE_SUCCESS;
  }

2780 2781
  if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
    doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
H
Haojun Liao 已提交
2782 2783
    return TSDB_CODE_SUCCESS;
  }
2784

H
Haojun Liao 已提交
2785 2786
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2787

2788 2789 2790
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
  int32_t numOfRows = pBlock->info.rows;
  int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
2791

2792
  SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
2793
  STSchema*           pSchema = pReader->pSchema;
H
Haojun Liao 已提交
2794

2795
  SColVal colVal = {0};
2796
  int32_t i = 0, j = 0;
H
Haojun Liao 已提交
2797

2798 2799 2800 2801
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
  if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
    colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
    i += 1;
H
Haojun Liao 已提交
2802 2803
  }

2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817
  while (i < numOfCols && j < pSchema->numOfCols) {
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    col_id_t colId = pColInfoData->info.colId;

    if (colId == pSchema->columns[j].colId) {
      tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
      doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
      i += 1;
      j += 1;
    } else if (colId < pSchema->columns[j].colId) {
      colDataAppendNULL(pColInfoData, numOfRows);
      i += 1;
    } else if (colId > pSchema->columns[j].colId) {
      j += 1;
2818
    }
H
Haojun Liao 已提交
2819 2820
  }

2821
  // set null value since current column does not exist in the "pSchema"
2822
  while (i < numOfCols) {
2823 2824 2825 2826
    pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(pColInfoData, numOfRows);
    i += 1;
  }
H
Haojun Liao 已提交
2827

2828 2829
  pBlock->info.rows += 1;
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2830 2831
}

2832 2833
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
                                  STsdbReader* pReader) {
H
Haojun Liao 已提交
2834
  SSDataBlock* pBlock = pReader->pResBlock;
C
Cary Xu 已提交
2835

H
Haojun Liao 已提交
2836
  do {
H
Haojun Liao 已提交
2837
    STSRow* pTSRow = NULL;
2838
    tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey);
2839
    if (pTSRow == NULL) {
H
Haojun Liao 已提交
2840 2841 2842
      break;
    }

2843
    doAppendOneRow(pBlock, pReader, pTSRow);
H
Haojun Liao 已提交
2844

H
Haojun Liao 已提交
2845
    // no data in buffer, return immediately
2846
    if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
H
Haojun Liao 已提交
2847 2848 2849
      break;
    }

2850
    if (pBlock->info.rows >= capacity) {
H
Haojun Liao 已提交
2851 2852
      break;
    }
H
Haojun Liao 已提交
2853
  } while (1);
H
Haojun Liao 已提交
2854

2855
  ASSERT(pBlock->info.rows <= capacity);
H
Haojun Liao 已提交
2856 2857
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2858

2859
// todo refactor, use arraylist instead
H
Hongze Cheng 已提交
2860
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
2861 2862
  ASSERT(pReader != NULL);
  taosHashClear(pReader->status.pTableMap);
H
Haojun Liao 已提交
2863

2864 2865
  STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
  taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
H
Hongze Cheng 已提交
2866
  return TDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2867 2868
}

dengyihao's avatar
dengyihao 已提交
2869 2870 2871 2872 2873 2874
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2875

dengyihao's avatar
dengyihao 已提交
2876 2877 2878 2879 2880 2881
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
L
Liu Jicong 已提交
2882

C
Cary Xu 已提交
2883 2884 2885 2886 2887 2888 2889 2890 2891 2892
/**
 * @brief Get all suids since suid
 *
 * @param pMeta
 * @param suid return all suids in one vnode if suid is 0
 * @param list
 * @return int32_t
 */
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
L
Liu Jicong 已提交
2893
  if (!pCur) {
C
Cary Xu 已提交
2894 2895
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909

  while (1) {
    tb_uid_t id = metaStbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseStbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
2910 2911 2912 2913
// static void destroyHelper(void* param) {
//   if (param == NULL) {
//     return;
//   }
H
Haojun Liao 已提交
2914

H
Hongze Cheng 已提交
2915 2916 2917 2918 2919 2920
//   //  tQueryInfo* pInfo = (tQueryInfo*)param;
//   //  if (pInfo->optr != TSDB_RELATION_IN) {
//   //    taosMemoryFreeClear(pInfo->q);
//   //  } else {
//   //    taosHashCleanup((SHashObj *)(pInfo->q));
//   //  }
H
Haojun Liao 已提交
2921

H
Hongze Cheng 已提交
2922 2923
//   taosMemoryFree(param);
// }
2924

H
Hongze Cheng 已提交
2925 2926
// #define TSDB_PREV_ROW 0x1
// #define TSDB_NEXT_ROW 0x2
H
Haojun Liao 已提交
2927

H
Hongze Cheng 已提交
2928 2929 2930 2931
// static bool loadBlockOfActiveTable(STsdbReader* pTsdbReadHandle) {
//   if (pTsdbReadHandle->checkFiles) {
//     // check if the query range overlaps with the file data block
//     bool exists = true;
H
Haojun Liao 已提交
2932

2933
//     int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists);
H
Hongze Cheng 已提交
2934 2935 2936 2937
//     if (code != TSDB_CODE_SUCCESS) {
//       pTsdbReadHandle->checkFiles = false;
//       return false;
//     }
H
Haojun Liao 已提交
2938

H
Hongze Cheng 已提交
2939 2940 2941 2942 2943 2944
//     if (exists) {
//       tsdbRetrieveDataBlock((STsdbReader**)pTsdbReadHandle, NULL);
//       if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
//         SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
//         assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
//       }
H
Haojun Liao 已提交
2945

H
Hongze Cheng 已提交
2946 2947 2948
//       pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
//       return exists;
//     }
H
Haojun Liao 已提交
2949

H
Hongze Cheng 已提交
2950 2951
//     pTsdbReadHandle->checkFiles = false;
//   }
H
Haojun Liao 已提交
2952

H
Hongze Cheng 已提交
2953 2954 2955 2956 2957 2958 2959 2960 2961
//   if (hasMoreDataInCache(pTsdbReadHandle)) {
//     pTsdbReadHandle->currentLoadExternalRows = false;
//     return true;
//   }

//   // current result is empty
//   if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
//       pTsdbReadHandle->cur.rows == 0) {
//     //    SMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2962

H
Hongze Cheng 已提交
2963 2964
//     //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
//     //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2965

H
Hongze Cheng 已提交
2966 2967 2968 2969 2970 2971 2972 2973
//     bool result = tsdbGetExternalRow(pTsdbReadHandle);

//     //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//     //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//     pTsdbReadHandle->currentLoadExternalRows = false;

//     return result;
//   }
H
Haojun Liao 已提交
2974

H
Hongze Cheng 已提交
2975 2976 2977 2978 2979 2980
//   return false;
// }

// static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) {
//   size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//   assert(numOfTables > 0);
H
Haojun Liao 已提交
2981

H
Hongze Cheng 已提交
2982 2983 2984 2985 2986 2987 2988
//   int64_t stime = taosGetTimestampUs();

//   while (pTsdbReadHandle->activeIndex < numOfTables) {
//     if (loadBlockOfActiveTable(pTsdbReadHandle)) {
//       return true;
//     }

H
Haojun Liao 已提交
2989
//     STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Hongze Cheng 已提交
2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041
//     pCheckInfo->numOfBlocks = 0;

//     pTsdbReadHandle->activeIndex += 1;
//     pTsdbReadHandle->locateStart = false;
//     pTsdbReadHandle->checkFiles = true;
//     pTsdbReadHandle->cur.rows = 0;
//     pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;

//     terrno = TSDB_CODE_SUCCESS;

//     int64_t elapsedTime = taosGetTimestampUs() - stime;
//     pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
//   }

//   return false;
// }

// bool tsdbGetExternalRow(STsdbReader* pHandle) {
//   STsdbReader*   pTsdbReadHandle = (STsdbReader*)pHandle;
//   SQueryFilePos* cur = &pTsdbReadHandle->cur;

//   cur->fid = INT32_MIN;
//   cur->mixBlock = true;
//   if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
//     cur->rows = 0;
//     return false;
//   }

//   int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//   for (int32_t i = 0; i < numOfCols; ++i) {
//     SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//     SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);

//     memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

//     SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
//     memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);

//     if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
//       cur->win.skey = *(TSKEY*)pColInfoData->pData;
//       cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
//     }
//   }

//   cur->rows = 2;
//   return true;
// }

// static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
//   if (pColumnInfoData == NULL) {
//     return NULL;
//   }
H
Haojun Liao 已提交
3042

H
Hongze Cheng 已提交
3043 3044 3045 3046 3047
//   size_t cols = taosArrayGetSize(pColumnInfoData);
//   for (int32_t i = 0; i < cols; ++i) {
//     SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
//     colDataDestroy(pColInfo);
//   }
3048

H
Hongze Cheng 已提交
3049 3050 3051
//   taosArrayDestroy(pColumnInfoData);
//   return NULL;
// }
H
Haojun Liao 已提交
3052

H
Hongze Cheng 已提交
3053 3054 3055
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
//   size_t size = taosArrayGetSize(pTableCheckInfo);
//   for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
3056
//     STableBlockScanInfo* p = taosArrayGet(pTableCheckInfo, i);
H
Hongze Cheng 已提交
3057
//     destroyTableMemIterator(p);
3058

H
Hongze Cheng 已提交
3059 3060
//     taosMemoryFreeClear(p->pCompInfo);
//   }
3061

H
Hongze Cheng 已提交
3062 3063 3064
//   taosArrayDestroy(pTableCheckInfo);
//   return NULL;
// }
H
Haojun Liao 已提交
3065

H
refact  
Hongze Cheng 已提交
3066
// ====================================== EXPOSED APIs ======================================
3067 3068
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
                       const char* idstr) {
H
Haojun Liao 已提交
3069
  int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, idstr);
H
Haojun Liao 已提交
3070 3071 3072
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
3073

3074 3075 3076 3077 3078 3079 3080 3081
  if (pCond->suid != 0) {
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1);
    ASSERT((*ppReader)->pSchema);
  } else if (taosArrayGetSize(pTableList) > 0) {
    STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
    (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1);
  }

H
Haojun Liao 已提交
3082
  STsdbReader* pReader = *ppReader;
3083
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3084 3085 3086
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3087

3088 3089
  int32_t numOfTables = taosArrayGetSize(pTableList);
  pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
H
Haojun Liao 已提交
3090 3091 3092
  if (pReader->status.pTableMap == NULL) {
    tsdbReaderClose(pReader);
    *ppReader = NULL;
H
Haojun Liao 已提交
3093

H
Haojun Liao 已提交
3094 3095
    code = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _err;
3096
  }
H
Haojun Liao 已提交
3097

3098
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;
H
Haojun Liao 已提交
3099

3100
  STsdbFSState* pFState = pReader->pTsdb->fs->cState;
3101
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
3102
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3103

3104 3105 3106 3107 3108
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
3109
    if (code != TSDB_CODE_SUCCESS) {
3110 3111 3112
      return code;
    }
  }
H
Haojun Liao 已提交
3113

3114
  tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
H
Hongze Cheng 已提交
3115
  return code;
H
Haojun Liao 已提交
3116

H
Hongze Cheng 已提交
3117
_err:
3118
  tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr);
H
Hongze Cheng 已提交
3119
  return code;
H
refact  
Hongze Cheng 已提交
3120
}
H
Haojun Liao 已提交
3121

H
refact  
Hongze Cheng 已提交
3122
void tsdbReaderClose(STsdbReader* pReader) {
3123 3124
  if (pReader == NULL) {
    return;
3125
  }
H
Haojun Liao 已提交
3126

3127
  blockDataDestroy(pReader->pResBlock);
H
Haojun Liao 已提交
3128

3129 3130 3131
  taosMemoryFreeClear(pReader->suppInfo.pstatis);
  taosMemoryFreeClear(pReader->suppInfo.plist);
  taosMemoryFree(pReader->suppInfo.slotIds);
H
Haojun Liao 已提交
3132

dengyihao's avatar
dengyihao 已提交
3133 3134 3135 3136
  if (!isEmptyQueryTimeWindow(&pReader->window)) {
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
  } else {
    ASSERT(pReader->status.pTableMap == NULL);
H
Haojun Liao 已提交
3137
  }
H
Haojun Liao 已提交
3138 3139 3140 3141
#if 0
//   if (pReader->status.pTableScanInfo != NULL) {
//     pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
//   }
H
Haojun Liao 已提交
3142

H
Haojun Liao 已提交
3143
//   tsdbDestroyReadH(&pReader->rhelper);
3144

H
Haojun Liao 已提交
3145 3146 3147 3148 3149 3150
//   tdFreeDataCols(pReader->pDataCols);
//   pReader->pDataCols = NULL;
//
//   pReader->prev = doFreeColumnInfoData(pReader->prev);
//   pReader->next = doFreeColumnInfoData(pReader->next);
#endif
Y
yihaoDeng 已提交
3151

3152
  SIOCostSummary* pCost = &pReader->cost;
3153

3154 3155 3156 3157
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime,
            pCost->checkForNextTime, pReader->idStr);
3158

3159 3160 3161
  taosMemoryFree(pReader->idStr);
  taosMemoryFree(pReader->pSchema);
  taosMemoryFreeClear(pReader);
H
refact  
Hongze Cheng 已提交
3162
}
Y
yihaoDeng 已提交
3163

H
refact  
Hongze Cheng 已提交
3164
bool tsdbNextDataBlock(STsdbReader* pReader) {
3165
  if (isEmptyQueryTimeWindow(&pReader->window)) {
H
Haojun Liao 已提交
3166
    return false;
H
Haojun Liao 已提交
3167
  }
Y
yihaoDeng 已提交
3168

H
Haojun Liao 已提交
3169
  // cleanup the data that belongs to the previous data block
3170 3171
  SSDataBlock* pBlock = pReader->pResBlock;
  blockDataCleanup(pBlock);
H
Haojun Liao 已提交
3172

3173 3174
  int64_t        stime = taosGetTimestampUs();
  int64_t        elapsedTime = stime;
3175
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3176

H
Haojun Liao 已提交
3177
  if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
3178
    if (pStatus->loadFromFile) {
3179
      int32_t code = buildBlockFromFiles(pReader);
3180
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3181 3182 3183
        return false;
      }

3184
      if (pBlock->info.rows > 0) {
3185
        return true;
3186
      } else {
H
Haojun Liao 已提交
3187
        buildBlockFromBufferSequentially(pReader);
3188
        return pBlock->info.rows > 0;
H
Haojun Liao 已提交
3189
      }
3190
    } else {  // no data in files, let's try the buffer
H
Haojun Liao 已提交
3191
      buildBlockFromBufferSequentially(pReader);
3192
      return pBlock->info.rows > 0;
Y
yihaoDeng 已提交
3193
    }
H
Haojun Liao 已提交
3194 3195
  } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
  } else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) {
3196 3197
  } else {
    ASSERT(0);
Y
yihaoDeng 已提交
3198
  }
3199
  return false;
Y
yihaoDeng 已提交
3200
}
3201

H
refact  
Hongze Cheng 已提交
3202
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
3203 3204 3205 3206
  ASSERT(pDataBlockInfo != NULL && pReader != NULL);
  pDataBlockInfo->rows = pReader->pResBlock->info.rows;
  pDataBlockInfo->uid = pReader->pResBlock->info.uid;
  pDataBlockInfo->window = pReader->pResBlock->info.window;
H
Hongze Cheng 已提交
3207
}
H
Haojun Liao 已提交
3208

H
Hongze Cheng 已提交
3209
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
H
Hongze Cheng 已提交
3210
  int32_t code = 0;
3211
  *allHave = false;
H
Haojun Liao 已提交
3212

3213 3214 3215
  if (pReader->status.composedDataBlock) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3216 3217
  }

H
Haojun Liao 已提交
3218
  // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
H
Hongze Cheng 已提交
3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280
  // assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));

  // // file block with sub-blocks has no statistics data
  // if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
  //   *pBlockStatis = NULL;
  //   return TSDB_CODE_SUCCESS;
  // }

  // int64_t stime = taosGetTimestampUs();
  // int     statisStatus = tsdbLoadBlockStatis(&pReader->rhelper, pBlockInfo->compBlock);
  // if (statisStatus < TSDB_STATIS_OK) {
  //   return terrno;
  // } else if (statisStatus > TSDB_STATIS_OK) {
  //   *pBlockStatis = NULL;
  //   return TSDB_CODE_SUCCESS;
  // }

  // tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pReader->pTsdb),
  //           TSDB_READ_TABLE_UID(&pReader->rhelper));

  // int16_t* colIds = pReader->suppInfo.defaultLoadColumn->pData;

  // size_t numOfCols = QH_GET_NUM_OF_COLS(pReader);
  // memset(pReader->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  // memset(pReader->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

  // for (int32_t i = 0; i < numOfCols; ++i) {
  //   pReader->suppInfo.pstatis[i].colId = colIds[i];
  // }

  // *allHave = true;
  // tsdbGetBlockStatis(&pReader->rhelper, pReader->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);

  // // always load the first primary timestamp column data
  // SColumnDataAgg* pPrimaryColStatis = &pReader->suppInfo.pstatis[0];
  // assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);

  // pPrimaryColStatis->numOfNull = 0;
  // pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
  // pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
  // pReader->suppInfo.plist[0] = &pReader->suppInfo.pstatis[0];

  // // update the number of NULL data rows
  // int32_t* slotIds = pReader->suppInfo.slotIds;
  // for (int32_t i = 1; i < numOfCols; ++i) {
  //   ASSERT(colIds[i] == pReader->pSchema->columns[slotIds[i]].colId);
  //   if (IS_BSMA_ON(&(pReader->pSchema->columns[slotIds[i]]))) {
  //     if (pReader->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
  //       pReader->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
  //     }

  //     pReader->suppInfo.plist[i] = &pReader->suppInfo.pstatis[i];
  //   } else {
  //     *allHave = false;
  //   }
  // }

  // int64_t elapsed = taosGetTimestampUs() - stime;
  // pReader->cost.statisInfoLoadTime += elapsed;

  // *pBlockStatis = pReader->suppInfo.plist;
  return code;
3281 3282
}

H
Hongze Cheng 已提交
3283
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
H
Haojun Liao 已提交
3284
  SReaderStatus* pStatus = &pReader->status;
H
Haojun Liao 已提交
3285

H
Haojun Liao 已提交
3286
  if (pStatus->composedDataBlock) {
3287
    return pReader->pResBlock->pDataBlock;
3288
  }
3289

3290
  SFileDataBlockInfo*  pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
3291
  STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
D
update  
dapan1121 已提交
3292

3293 3294 3295 3296 3297
  int32_t code = tBlockDataInit(&pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
D
update  
dapan1121 已提交
3298

3299 3300 3301 3302
  code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
D
update  
dapan1121 已提交
3303 3304
  }

3305 3306
  copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
  return pReader->pResBlock->pDataBlock;
D
update  
dapan1121 已提交
3307 3308
}

3309 3310 3311
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
  if (isEmptyQueryTimeWindow(&pReader->window)) {
    return TSDB_CODE_SUCCESS;
3312
  }
3313

3314
  setQueryTimewindow(pReader, pCond, tWinIdx);
3315

dengyihao's avatar
dengyihao 已提交
3316 3317
  pReader->order = pCond->order;
  pReader->type = BLOCK_LOAD_OFFSET_ORDER;
3318
  pReader->status.loadFromFile = true;
dengyihao's avatar
dengyihao 已提交
3319
  pReader->status.pTableIter = NULL;
3320

3321
  pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]);
3322

3323 3324 3325
  // allocate buffer in order to load data blocks from file
  memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
H
hjxilinx 已提交
3326

3327 3328 3329
  // todo set the correct numOfTables
  int32_t         numOfTables = 1;
  SDataBlockIter* pBlockIter = &pReader->status.blockIter;
H
Haojun Liao 已提交
3330

3331 3332 3333
  STsdbFSState* pFState = pReader->pTsdb->fs->cState;
  initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
  resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
3334
  resetDataBlockScanInfo(pReader->status.pTableMap);
3335 3336 3337 3338 3339 3340 3341 3342 3343 3344

  int32_t code = 0;
  // no data in files, let's try buffer in memory
  if (pReader->status.fileIter.numOfFiles == 0) {
    pReader->status.loadFromFile = false;
  } else {
    code = initForFirstBlockInFile(pReader, pBlockIter);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
3345
  }
H
Haojun Liao 已提交
3346

dengyihao's avatar
dengyihao 已提交
3347 3348
  tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
            pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
3349
  return code;
H
Hongze Cheng 已提交
3350
}
H
Haojun Liao 已提交
3351

H
refact  
Hongze Cheng 已提交
3352
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
H
Hongze Cheng 已提交
3353 3354 3355
  int32_t code = 0;
  // pTableBlockInfo->totalSize = 0;
  // pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
3356

H
Hongze Cheng 已提交
3357
  // STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb);
H
Haojun Liao 已提交
3358

H
Hongze Cheng 已提交
3359 3360 3361 3362
  // // find the start data block in file
  // pReader->locateStart = true;
  // STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb);
  // int32_t       fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision);
C
Cary Xu 已提交
3363

H
Hongze Cheng 已提交
3364 3365 3366 3367
  // tsdbRLockFS(pFileHandle);
  // tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order);
  // tsdbFSIterSeek(&pReader->fileIter, fid);
  // tsdbUnLockFS(pFileHandle);
H
Haojun Liao 已提交
3368

H
Hongze Cheng 已提交
3369 3370 3371
  // STsdbCfg* pc = REPO_CFG(pReader->pTsdb);
  // pTableBlockInfo->defMinRows = pc->minRows;
  // pTableBlockInfo->defMaxRows = pc->maxRows;
3372

H
Hongze Cheng 已提交
3373
  // int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);
H
Haojun Liao 已提交
3374

H
Hongze Cheng 已提交
3375
  // pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
3376

H
Hongze Cheng 已提交
3377 3378 3379 3380 3381
  // int32_t     code = TSDB_CODE_SUCCESS;
  // int32_t     numOfBlocks = 0;
  // int32_t     numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo);
  // int         defaultRows = 4096;
  // STimeWindow win = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3382

H
Hongze Cheng 已提交
3383 3384 3385
  // while (true) {
  //   numOfBlocks = 0;
  //   tsdbRLockFS(REPO_FS(pReader->pTsdb));
H
Haojun Liao 已提交
3386

H
Hongze Cheng 已提交
3387 3388 3389 3390
  //   if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) {
  //     tsdbUnLockFS(REPO_FS(pReader->pTsdb));
  //     break;
  //   }
3391

H
Hongze Cheng 已提交
3392
  //   tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
3393

H
Hongze Cheng 已提交
3394 3395 3396 3397 3398 3399 3400 3401
  //   // current file are not overlapped with query time window, ignore remain files
  //   if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
  //     tsdbUnLockFS(REPO_FS(pReader->pTsdb));
  //     tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
  //               pReader->window.skey, pReader->window.ekey, pReader->idStr);
  //     pReader->pFileGroup = NULL;
  //     break;
  //   }
H
Haojun Liao 已提交
3402

H
Hongze Cheng 已提交
3403 3404 3405 3406 3407 3408
  //   pTableBlockInfo->numOfFiles += 1;
  //   if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) {
  //     tsdbUnLockFS(REPO_FS(pReader->pTsdb));
  //     code = terrno;
  //     break;
  //   }
H
hjxilinx 已提交
3409

H
Hongze Cheng 已提交
3410
  //   tsdbUnLockFS(REPO_FS(pReader->pTsdb));
3411

H
Hongze Cheng 已提交
3412 3413 3414 3415
  //   if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) {
  //     code = terrno;
  //     break;
  //   }
H
Haojun Liao 已提交
3416

H
Hongze Cheng 已提交
3417 3418 3419
  //   if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
  //     break;
  //   }
3420

H
Hongze Cheng 已提交
3421 3422
  //   tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
  //             pReader->pFileGroup->fid, pReader->idStr);
3423

H
Hongze Cheng 已提交
3424 3425 3426
  //   if (numOfBlocks == 0) {
  //     continue;
  //   }
3427

H
Hongze Cheng 已提交
3428
  //   pTableBlockInfo->numOfBlocks += numOfBlocks;
3429

H
Hongze Cheng 已提交
3430
  //   for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
3431
  //     STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
H
Haojun Liao 已提交
3432

H
Hongze Cheng 已提交
3433
  //     SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
H
Haojun Liao 已提交
3434

H
Hongze Cheng 已提交
3435 3436
  //     for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
  //       pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
3437

H
Hongze Cheng 已提交
3438 3439
  //       int32_t numOfRows = pBlock[j].numOfRows;
  //       pTableBlockInfo->totalRows += numOfRows;
3440

H
Hongze Cheng 已提交
3441 3442 3443
  //       if (numOfRows > pTableBlockInfo->maxRows) {
  //         pTableBlockInfo->maxRows = numOfRows;
  //       }
3444

H
Hongze Cheng 已提交
3445 3446 3447
  //       if (numOfRows < pTableBlockInfo->minRows) {
  //         pTableBlockInfo->minRows = numOfRows;
  //       }
3448

H
Hongze Cheng 已提交
3449 3450 3451
  //       if (numOfRows < defaultRows) {
  //         pTableBlockInfo->numOfSmallBlocks += 1;
  //       }
3452

H
Hongze Cheng 已提交
3453 3454 3455 3456 3457
  //       int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
  //       pTableBlockInfo->blockRowsHisto[bucketIndex]++;
  //     }
  //   }
  // }
3458

H
Hongze Cheng 已提交
3459
  // pTableBlockInfo->numOfTables = numOfTables;
H
refact  
Hongze Cheng 已提交
3460 3461
  return code;
}
H
Haojun Liao 已提交
3462

H
refact  
Hongze Cheng 已提交
3463
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
3464
  int64_t rows = 0;
H
Haojun Liao 已提交
3465

3466 3467
  SReaderStatus* pStatus = &pReader->status;
  pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
3468

3469 3470
  while (pStatus->pTableIter != NULL) {
    STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
3471

3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490
    STbData* d = NULL;
    if (pReader->pTsdb->mem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
      if (d != NULL) {
        rows += tsdbGetNRowsInTbData(d);
      }
    }

    STbData* di = NULL;
    if (pReader->pTsdb->imem != NULL) {
      tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
      if (di != NULL) {
        rows += tsdbGetNRowsInTbData(di);
      }
    }

    // current table is exhausted, let's try the next table
    pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
  }
H
Haojun Liao 已提交
3491

H
refact  
Hongze Cheng 已提交
3492
  return rows;
3493
}