tsdbRead.c 126.4 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
L
Liu Jicong 已提交
17
#include "vnode.h"
18

dengyihao's avatar
dengyihao 已提交
19 20
#define EXTRA_BYTES                2
#define ASCENDING_TRAVERSE(o)      (o == TSDB_ORDER_ASC)
21
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
22

H
Hongze Cheng 已提交
23 24 25 26
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                      \
  ((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
                    .numOfCols = (_block)->numOfCols,                                     \
                    .rows = (_block)->numOfRows,                                          \
27
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
28

H
hjxilinx 已提交
29
enum {
dengyihao's avatar
dengyihao 已提交
30 31
  TSDB_QUERY_TYPE_ALL = 1,
  TSDB_QUERY_TYPE_LAST = 2,
H
hjxilinx 已提交
32 33
};

34
enum {
dengyihao's avatar
dengyihao 已提交
35
  TSDB_CACHED_TYPE_NONE = 0,
36
  TSDB_CACHED_TYPE_LASTROW = 1,
dengyihao's avatar
dengyihao 已提交
37
  TSDB_CACHED_TYPE_LAST = 2,
38 39
};

40
typedef struct SQueryFilePos {
dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47
  int32_t     fid;
  int32_t     slot;
  int32_t     pos;
  int64_t     lastKey;
  int32_t     rows;
  bool        mixBlock;
  bool        blockCompleted;
48
  STimeWindow win;
49
} SQueryFilePos;
H
hjxilinx 已提交
50

51
typedef struct SDataBlockLoadInfo {
dengyihao's avatar
dengyihao 已提交
52 53 54 55
  SDFileSet* fileGroup;
  int32_t    slot;
  uint64_t   uid;
  SArray*    pLoadedCols;
56
} SDataBlockLoadInfo;
H
hjxilinx 已提交
57

58
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
59
  int32_t tid; /* table tid */
60 61
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
62

63
enum {
dengyihao's avatar
dengyihao 已提交
64
  CHECKINFO_CHOSEN_MEM = 0,
65
  CHECKINFO_CHOSEN_IMEM = 1,
dengyihao's avatar
dengyihao 已提交
66
  CHECKINFO_CHOSEN_BOTH = 2  // for update=2(merge case)
67 68
};

69
typedef struct STableCheckInfo {
H
Hongze Cheng 已提交
70 71 72 73 74 75 76 77 78 79
  uint64_t     suid;
  uint64_t     tableId;
  TSKEY        lastKey;
  SBlockInfo*  pCompInfo;
  int32_t      compSize;
  int32_t      numOfBlocks : 29;  // number of qualified data blocks not the original blocks
  uint8_t      chosen : 2;        // indicate which iterator should move forward
  bool         initBuf : 1;       // whether to initialize the in-memory skip list iterator or not
  STbDataIter* iter;              // mem buffer skip list iterator
  STbDataIter* iiter;             // imem buffer skip list iterator
80
} STableCheckInfo;
81

82
typedef struct STableBlockInfo {
dengyihao's avatar
dengyihao 已提交
83 84
  SBlock*          compBlock;
  STableCheckInfo* pTableCheckInfo;
85
} STableBlockInfo;
86

87
typedef struct SBlockOrderSupporter {
dengyihao's avatar
dengyihao 已提交
88 89 90 91
  int32_t           numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*          blockIndexArray;
  int32_t*          numOfBlocksPerTable;
92 93
} SBlockOrderSupporter;

H
Haojun Liao 已提交
94 95 96
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
97
  int64_t checkForNextTime;
98 99
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
100 101
} SIOCostSummary;

102
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
103 104 105 106
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
  SArray*          defaultLoadColumn;  // default load column
  int32_t*         slotIds;            // colId to slotId
107 108
} SBlockLoadSuppInfo;

109
typedef struct STsdbReadHandle {
C
Cary Xu 已提交
110
  STsdb*        pTsdb;
H
more  
Hongze Cheng 已提交
111
  uint64_t      suid;
C
Cary Xu 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  SQueryFilePos cur;  // current position
  int16_t       order;
  STimeWindow   window;  // the primary query time window that applies to all queries
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
  int32_t numOfBlocks;
  SArray* pColumns;  // column list, SColumnInfoData array list
  bool    locateStart;
  int32_t outputCapacity;
  int32_t realNumOfRows;
  SArray* pTableCheckInfo;  // SArray<STableCheckInfo>
  int32_t activeIndex;
  bool    checkFiles;               // check file stage
  int8_t  cachelastrow;             // check if last row cached
  bool    loadExternalRow;          // load time window external data rows
  bool    currentLoadExternalRows;  // current load external rows
  int32_t loadType;                 // block load type
  char*   idStr;                    // query info handle, for debug purpose
dengyihao's avatar
dengyihao 已提交
130 131 132 133 134
  int32_t type;  // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
  SDFileSet*         pFileGroup;
  SFSIter            fileIter;
  SReadH             rhelper;
  STableBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
135 136 137 138
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
139
  SBlockLoadSuppInfo suppInfo;
C
Cary Xu 已提交
140 141 142 143
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
  SIOCostSummary     cost;
  STSchema*          pSchema;
144
} STsdbReadHandle;
145

wmmhello's avatar
wmmhello 已提交
146 147
static STimeWindow updateLastrowForEachGroup(STableListInfo* pList);
static int32_t     checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList);
dengyihao's avatar
dengyihao 已提交
148
static int32_t     checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
149
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
150

H
Haojun Liao 已提交
151
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
152
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
dengyihao's avatar
dengyihao 已提交
153 154
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
                                     STsdbReadHandle* pTsdbReadHandle);
155
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
dengyihao's avatar
dengyihao 已提交
156 157 158 159
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
// static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
160

C
Cary Xu 已提交
161
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions);
C
Cary Xu 已提交
162

163
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
164
  pBlockLoadInfo->slot = -1;
dengyihao's avatar
dengyihao 已提交
165
  pBlockLoadInfo->uid = 0;
H
hjxilinx 已提交
166
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
167 168
}

169
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
170
  pCompBlockLoadInfo->tid = -1;
171 172
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
173

174 175
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
176 177 178 179
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
180
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
181 182 183 184 185 186
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

187 188
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
189 190 191 192 193

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
194 195
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
196 197 198 199 200 201
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
202
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
dengyihao's avatar
dengyihao 已提交
203
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
204

H
refact  
Hongze Cheng 已提交
205 206
  int64_t    rows = 0;
  SMemTable* pMemTable = NULL;  // pTsdbReadHandle->pMemTable;
dengyihao's avatar
dengyihao 已提交
207 208 209
  if (pMemTable == NULL) {
    return rows;
  }
H
Haojun Liao 已提交
210 211 212 213 214

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

dengyihao's avatar
dengyihao 已提交
215 216 217 218 219 220 221 222
    //    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
    //      pMem = pMemT->tData[pCheckInfo->tableId];
    //      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
    //    }
    //    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
    //      pIMem = pIMemT->tData[pCheckInfo->tableId];
    //      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
    //    }
H
Haojun Liao 已提交
223 224 225
  }
  return rows;
}
226

wmmhello's avatar
wmmhello 已提交
227 228 229
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
  size_t tableSize = taosArrayGetSize(pTableList->pTableList);
  assert(tableSize >= 1);
H
Haojun Liao 已提交
230 231

  // allocate buffer in order to load data blocks from file
wmmhello's avatar
wmmhello 已提交
232
  SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
233 234 235 236 237
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
238 239
  for (int32_t j = 0; j < tableSize; ++j) {
    STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
H
Haojun Liao 已提交
240

wmmhello's avatar
wmmhello 已提交
241
    STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
H
more  
Hongze Cheng 已提交
242
    info.suid = pTsdbReadHandle->suid;
wmmhello's avatar
wmmhello 已提交
243 244
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
245
        info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
246 247
      }

wmmhello's avatar
wmmhello 已提交
248 249 250
      assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
    } else {
      info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
251
    }
wmmhello's avatar
wmmhello 已提交
252 253

    taosArrayPush(pTableCheckInfo, &info);
dengyihao's avatar
dengyihao 已提交
254 255
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
256 257
  }

258
  // TODO  group table according to the tag value.
259
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
260 261 262
  return pTableCheckInfo;
}

263 264
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
265 266 267 268
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
dengyihao's avatar
dengyihao 已提交
269
    STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
270
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
H
Hongze Cheng 已提交
271 272
    pCheckInfo->iter = tsdbTbDataIterDestroy(pCheckInfo->iter);
    pCheckInfo->iiter = tsdbTbDataIterDestroy(pCheckInfo->iiter);
273
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
274

275 276
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
277
    } else {
278
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
279
    }
H
Haojun Liao 已提交
280 281 282
  }
}

H
Haojun Liao 已提交
283 284 285
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
286

dengyihao's avatar
dengyihao 已提交
287
  STableCheckInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
288

H
Haojun Liao 已提交
289 290
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
291 292 293
  return pNew;
}

294 295
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
296

297
  STimeWindow* w = &pTsdbReadHandle->window;
dengyihao's avatar
dengyihao 已提交
298
  bool         asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
299 300 301 302

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

303 304
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
305
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
C
Cary Xu 已提交
306
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
307 308

  int64_t now = taosGetTimestamp(pCfg->precision);
309
  return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
310 311
}

312 313
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
314

315
  bool    updateTs = false;
316 317 318 319
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
320
      pCond->twindows[tWinIdx].skey = startTs;
321
      updateTs = true;
322 323
    }
  } else {
324 325
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
326
      pCond->twindows[tWinIdx].ekey = startTs;
327
      updateTs = true;
328 329 330
    }
  }

331
  if (updateTs) {
H
Haojun Liao 已提交
332
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
L
Liu Jicong 已提交
333 334
              pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey,
              pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
335
  }
336
}
C
Cary Xu 已提交
337

C
Cary Xu 已提交
338
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) {
C
Cary Xu 已提交
339
  if (VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
340
    int     level = 0;
C
Cary Xu 已提交
341
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
C
Cary Xu 已提交
342

C
Cary Xu 已提交
343
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
C
Cary Xu 已提交
344 345 346 347 348
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
C
Cary Xu 已提交
349 350
        break;
      }
C
Cary Xu 已提交
351
      if ((now - pRetention->keep) <= winSKey) {
C
Cary Xu 已提交
352
        break;
C
Cary Xu 已提交
353 354
      }
      ++level;
C
Cary Xu 已提交
355
    }
C
Cary Xu 已提交
356

C
Cary Xu 已提交
357
    if (level == TSDB_RETENTION_L0) {
S
Shengliang Guan 已提交
358
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
359
                TSDB_RETENTION_L0);
C
Cary Xu 已提交
360 361
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
S
Shengliang Guan 已提交
362
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
363
                TSDB_RETENTION_L1);
C
Cary Xu 已提交
364 365
      return VND_RSMA1(pVnode);
    } else {
S
Shengliang Guan 已提交
366
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
367
                TSDB_RETENTION_L2);
C
Cary Xu 已提交
368 369
      return VND_RSMA2(pVnode);
    }
C
Cary Xu 已提交
370
  }
C
Cary Xu 已提交
371
  return VND_TSDB(pVnode);
C
Cary Xu 已提交
372 373
}

374
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
375
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
376
  if (pReadHandle == NULL) {
377
    goto _end;
378
  }
H
Haojun Liao 已提交
379

380
  STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions);
C
Cary Xu 已提交
381

dengyihao's avatar
dengyihao 已提交
382
  pReadHandle->order = pCond->order;
C
Cary Xu 已提交
383
  pReadHandle->pTsdb = pTsdb;
dengyihao's avatar
dengyihao 已提交
384 385 386 387 388 389
  pReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid = INT32_MIN;
  pReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles = true;
  pReadHandle->activeIndex = 0;  // current active table index
  pReadHandle->allocSize = 0;
390
  pReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
391
  pReadHandle->loadType = pCond->type;
392

H
Hongze Cheng 已提交
393
  pReadHandle->suid = pCond->suid;
dengyihao's avatar
dengyihao 已提交
394
  pReadHandle->outputCapacity = 4096;  //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
395 396 397
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
398
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
399
  snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
H
Haojun Liao 已提交
400 401
  pReadHandle->idStr = strdup(buf);

C
Cary Xu 已提交
402
  if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
403
    goto _end;
B
Bomin Zhang 已提交
404
  }
H
Haojun Liao 已提交
405

406
  assert(pCond != NULL);
407
  setQueryTimewindow(pReadHandle, pCond, 0);
408

409
  if (pCond->numOfCols > 0) {
H
Haojun Liao 已提交
410
    int32_t rowLen = 0;
dengyihao's avatar
dengyihao 已提交
411
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
412 413 414
      rowLen += pCond->colList[i].bytes;
    }

415 416 417 418 419 420
    // make sure the output SSDataBlock size be less than 2MB.
    int32_t TWOMB = 2 * 1024 * 1024;
    if (pReadHandle->outputCapacity * rowLen > TWOMB) {
      pReadHandle->outputCapacity = TWOMB / rowLen;
    }

421
    // allocate buffer in order to load data blocks from file
422 423
    pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
    if (pReadHandle->suppInfo.pstatis == NULL) {
424
      goto _end;
425
    }
H
Haojun Liao 已提交
426

427
    // todo: use list instead of array?
428 429
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
430
      goto _end;
431
    }
H
Haojun Liao 已提交
432

433 434 435
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
436

437
      int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
438
      if (code != TSDB_CODE_SUCCESS) {
439
        goto _end;
440
      }
441

442
      taosArrayPush(pReadHandle->pColumns, &colInfo);
B
Bomin Zhang 已提交
443
    }
H
Haojun Liao 已提交
444

445
    pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
446 447 448 449

    size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
    pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
    pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
H
Haojun Liao 已提交
450
  }
451

C
Cary Xu 已提交
452
  pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
453
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
454
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
455
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
456
    goto _end;
H
hjxilinx 已提交
457
  }
458

459 460
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
461

H
Haojun Liao 已提交
462
  return (tsdbReaderT)pReadHandle;
463

dengyihao's avatar
dengyihao 已提交
464
_end:
465
  tsdbCleanupReadHandle(pReadHandle);
466
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
467
  return NULL;
H
hjxilinx 已提交
468 469
}

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle) {
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);

  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, pCheckInfo->tableId);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, sversion);
  return TSDB_CODE_SUCCESS;
}

503
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
L
Liu Jicong 已提交
504
                            uint64_t taskId) {
505
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
506
  if (pTsdbReadHandle == NULL) {
507 508 509
    return NULL;
  }

510
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
511
    return (tsdbReaderT)pTsdbReadHandle;
512
  }
H
Haojun Liao 已提交
513 514

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
515
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
516
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
517
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
518 519 520 521
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

522 523 524 525 526
  int32_t code = setCurrentSchema(pVnode, pTsdbReadHandle);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
527

C
Cary Xu 已提交
528
  int32_t  numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
529 530 531 532 533
  int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;

  STSchema* pSchema = pTsdbReadHandle->pSchema;

  int32_t i = 0, j = 0;
C
Cary Xu 已提交
534
  while (i < numOfCols && j < pSchema->numOfCols) {
535 536 537 538 539 540 541 542 543 544 545 546
    if (ids[i] == pSchema->columns[j].colId) {
      pTsdbReadHandle->suppInfo.slotIds[i] = j;
      i++;
      j++;
    } else if (ids[i] > pSchema->columns[j].colId) {
      j++;
    } else {
      //    tsdbCleanupReadHandle(pTsdbReadHandle);
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
    }
  }
547

wmmhello's avatar
wmmhello 已提交
548 549
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
            taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
dengyihao's avatar
dengyihao 已提交
550
            pTsdbReadHandle->idStr);
551

dengyihao's avatar
dengyihao 已提交
552
  return (tsdbReaderT)pTsdbReadHandle;
H
Haojun Liao 已提交
553 554
}

555
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
556
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
557

558 559 560
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
wafwerar's avatar
wafwerar 已提交
561
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
562 563 564 565 566
    }

    return;
  }

dengyihao's avatar
dengyihao 已提交
567
  pTsdbReadHandle->order = pCond->order;
568
  setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx);
dengyihao's avatar
dengyihao 已提交
569 570 571 572 573
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
574 575
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
576 577

  if (ASCENDING_TRAVERSE(pCond->order)) {
578
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
579
  } else {
580
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
581 582 583
  }

  // allocate buffer in order to load data blocks from file
584 585
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
586

587 588
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
589

590
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
591 592
}

L
Liu Jicong 已提交
593 594
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
                                     int32_t tWinIdx) {
595
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
596

dengyihao's avatar
dengyihao 已提交
597
  pTsdbReadHandle->order = pCond->order;
598
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
dengyihao's avatar
dengyihao 已提交
599 600 601 602 603
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
604 605
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
606 607

  if (ASCENDING_TRAVERSE(pCond->order)) {
608
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
609
  } else {
610
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
611 612 613
  }

  // allocate buffer in order to load data blocks from file
614 615
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
616

617 618
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
619

H
Haojun Liao 已提交
620
  SArray* pTable = NULL;
dengyihao's avatar
dengyihao 已提交
621
  //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
622

dengyihao's avatar
dengyihao 已提交
623
  //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
624

dengyihao's avatar
dengyihao 已提交
625 626
  pTsdbReadHandle->pTableCheckInfo = NULL;  // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
                                            // &pTable);
627
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
628
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
629 630
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
631

dengyihao's avatar
dengyihao 已提交
632 633
  //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
634 635
}

wmmhello's avatar
wmmhello 已提交
636
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
637
                             uint64_t taskId) {
638
  pCond->twindows[0] = updateLastrowForEachGroup(pList);
H
Haojun Liao 已提交
639 640

  // no qualified table
wmmhello's avatar
wmmhello 已提交
641
  if (taosArrayGetSize(pList->pTableList) == 0) {
H
Haojun Liao 已提交
642 643 644
    return NULL;
  }

L
Liu Jicong 已提交
645
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
646
  if (pTsdbReadHandle == NULL) {
647 648 649
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
650
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList);
dengyihao's avatar
dengyihao 已提交
651
  if (code != TSDB_CODE_SUCCESS) {  // set the numOfTables to be 0
H
Haojun Liao 已提交
652 653 654
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
655

656
  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey);
657 658
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
659
  }
dengyihao's avatar
dengyihao 已提交
660

661
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
662 663
}

664
#if 0
H
refact  
Hongze Cheng 已提交
665
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) {
666
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
667
  if (pTsdbReadHandle == NULL) {
668 669 670
    return NULL;
  }

671
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
672 673 674 675 676
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

677 678
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
679
  }
D
init  
dapan1121 已提交
680
  
681
  return pTsdbReadHandle;
H
hjxilinx 已提交
682 683
}

684
#endif
dengyihao's avatar
dengyihao 已提交
685
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
686
  assert(pHandle != NULL);
H
Haojun Liao 已提交
687

dengyihao's avatar
dengyihao 已提交
688
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
689

dengyihao's avatar
dengyihao 已提交
690
  size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
691
  SArray* res = taosArrayInit(size, POINTER_BYTES);
692 693 694
  return res;
}

H
Haojun Liao 已提交
695
// leave only one table for each group
dengyihao's avatar
dengyihao 已提交
696
// static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
wmmhello's avatar
wmmhello 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
//  assert(pGroupList);
//  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
//
//  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
//  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
//
//  for (int32_t i = 0; i < numOfGroup; ++i) {
//    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
//    size_t  numOfTables = taosArrayGetSize(oneGroup);
//
//    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
//    for (int32_t j = 0; j < numOfTables; ++j) {
//      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
//      //      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//      //        taosArrayPush(px, pInfo);
//      //        pNew->numOfTables += 1;
//      //        break;
//      //      }
//    }
//
//    // there are no data in this group
//    if (taosArrayGetSize(px) == 0) {
//      taosArrayDestroy(px);
//    } else {
//      taosArrayPush(pNew->pGroupList, &px);
//    }
//  }
//
//  return pNew;
//}
727

dengyihao's avatar
dengyihao 已提交
728
// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
wmmhello's avatar
wmmhello 已提交
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
//                                          uint64_t qId, uint64_t taskId) {
//  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
//
//  if (pNew->numOfTables == 0) {
//    tsdbDebug("update query time range to invalidate time window");
//
//    assert(taosArrayGetSize(pNew->pGroupList) == 0);
//    bool asc = ASCENDING_TRAVERSE(pCond->order);
//    if (asc) {
//      pCond->twindow.ekey = pCond->twindow.skey - 1;
//    } else {
//      pCond->twindow.skey = pCond->twindow.ekey - 1;
//    }
//  }
//
//  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
//  pTsdbReadHandle->loadExternalRow = true;
//  pTsdbReadHandle->currentLoadExternalRows = true;
//
//  return pTsdbReadHandle;
//}
750

751
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
752
  if (pCheckInfo->initBuf) {
753 754
    return true;
  }
H
Haojun Liao 已提交
755

756
  pCheckInfo->initBuf = true;
757
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
758

H
Hongze Cheng 已提交
759 760
  STbData* pMem = NULL;
  STbData* pIMem = NULL;
H
Hongze Cheng 已提交
761
  int8_t   backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0;
762

763
  TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
764
  if (pHandle->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
765
    tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem);
766
    if (pMem != NULL) {
H
Hongze Cheng 已提交
767
      tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter);
H
Haojun Liao 已提交
768
    }
769
  }
H
Haojun Liao 已提交
770

771
  if (pHandle->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
772
    tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem);
773
    if (pIMem != NULL) {
H
Hongze Cheng 已提交
774
      tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter);
H
Haojun Liao 已提交
775
    }
776
  }
H
Haojun Liao 已提交
777

778 779 780 781
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
782

H
Hongze Cheng 已提交
783 784 785 786
  bool memEmpty =
      (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL));
  bool imemEmpty =
      (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL));
dengyihao's avatar
dengyihao 已提交
787
  if (memEmpty && imemEmpty) {  // buffer is empty
788 789
    return false;
  }
H
Haojun Liao 已提交
790

791
  if (!memEmpty) {
H
Hongze Cheng 已提交
792
    TSDBROW row;
H
Haojun Liao 已提交
793

H
Hongze Cheng 已提交
794 795
    tsdbTbDataIterGet(pCheckInfo->iter, &row);
    TSKEY key = row.pTSRow->ts;  // first timestamp in buffer
796
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
797
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
H
Hongze Cheng 已提交
798 799
              pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey,
              pMem->sl.size, pHandle->idStr);
H
Haojun Liao 已提交
800 801 802 803 804 805 806

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

807
  } else {
dengyihao's avatar
dengyihao 已提交
808
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
809
  }
H
Haojun Liao 已提交
810

811
  if (!imemEmpty) {
H
Hongze Cheng 已提交
812
    TSDBROW row;
H
Haojun Liao 已提交
813

H
Hongze Cheng 已提交
814 815
    tsdbTbDataIterGet(pCheckInfo->iter, &row);
    TSKEY key = row.pTSRow->ts;  // first timestamp in buffer
816
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
817
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
H
Hongze Cheng 已提交
818 819
              pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey,
              pIMem->sl.size, pHandle->idStr);
H
Haojun Liao 已提交
820 821 822 823 824 825

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
826
  } else {
dengyihao's avatar
dengyihao 已提交
827
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
828
  }
H
Haojun Liao 已提交
829

830 831 832
  return true;
}

H
Haojun Liao 已提交
833
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
H
Hongze Cheng 已提交
834 835
  tsdbTbDataIterDestroy(pCheckInfo->iter);
  tsdbTbDataIterDestroy(pCheckInfo->iiter);
H
Haojun Liao 已提交
836 837
}

C
Cary Xu 已提交
838
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
H
Hongze Cheng 已提交
839
  TSDBROW row = {0};
H
Haojun Liao 已提交
840
  STSRow *rmem = NULL, *rimem = NULL;
H
Hongze Cheng 已提交
841

842
  if (pCheckInfo->iter) {
H
Hongze Cheng 已提交
843 844
    if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
      rmem = row.pTSRow;
845 846 847 848
    }
  }

  if (pCheckInfo->iiter) {
H
Hongze Cheng 已提交
849 850
    if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
      rimem = row.pTSRow;
851 852 853 854 855 856 857 858 859
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
860
    return TD_ROW_KEY(rmem);
861 862 863 864
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
865
    return TD_ROW_KEY(rimem);
866 867
  }

H
Haojun Liao 已提交
868 869
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
870 871

  if (r1 == r2) {
C
Cary Xu 已提交
872
#if 0
dengyihao's avatar
dengyihao 已提交
873
    if (update == TD_ROW_DISCARD_UPDATE) {
874 875
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
dengyihao's avatar
dengyihao 已提交
876
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
877 878 879 880 881
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
C
Cary Xu 已提交
882 883 884 885 886
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Hongze Cheng 已提交
887
      tsdbTbDataIterNext(pCheckInfo->iter);
C
Cary Xu 已提交
888
    }
889 890 891 892
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
dengyihao's avatar
dengyihao 已提交
893
  } else {
894 895 896 897 898
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

C
Cary Xu 已提交
899 900
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
                                 TDRowVerT maxVer) {
H
Hongze Cheng 已提交
901
  TSDBROW row;
H
Haojun Liao 已提交
902
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
903
  if (pCheckInfo->iter) {
H
Hongze Cheng 已提交
904 905
    if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
      rmem = row.pTSRow;
H
Haojun Liao 已提交
906 907
    }
  }
908

H
Haojun Liao 已提交
909
  if (pCheckInfo->iiter) {
H
Hongze Cheng 已提交
910 911
    if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
      rimem = row.pTSRow;
H
Haojun Liao 已提交
912 913
    }
  }
914

H
Haojun Liao 已提交
915 916
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
917
  }
918

H
Haojun Liao 已提交
919
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
920 921 922
    pCheckInfo->chosen = 0;
    return rmem;
  }
923

H
Haojun Liao 已提交
924
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
925 926 927
    pCheckInfo->chosen = 1;
    return rimem;
  }
928

H
Haojun Liao 已提交
929 930
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
931

932
  if (r1 == r2) {
C
Cary Xu 已提交
933
#if 0
934
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
935
      tSkipListIterNext(pCheckInfo->iter);
936
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
937
      return rimem;
dengyihao's avatar
dengyihao 已提交
938
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
939
      tSkipListIterNext(pCheckInfo->iiter);
940 941 942 943
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
944
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
945 946
      return rmem;
    }
C
Cary Xu 已提交
947 948 949 950 951 952
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
      *extraRow = rimem;
      return rmem;
    } else {
H
Hongze Cheng 已提交
953
      tsdbTbDataIterNext(pCheckInfo->iter);
C
Cary Xu 已提交
954 955 956
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      return rimem;
    }
H
Haojun Liao 已提交
957 958 959
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
960
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
961 962
        return rmem;
      } else {
963
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
964 965 966 967
        return rimem;
      }
    } else {
      if (r1 < r2) {
968
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
969 970
        return rimem;
      } else {
971
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
972 973 974 975
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
976 977
}

978
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
979
  bool hasNext = false;
980
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
981
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
982
      hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
H
Haojun Liao 已提交
983
    }
984

H
Haojun Liao 已提交
985 986 987
    if (hasNext) {
      return hasNext;
    }
988

H
Haojun Liao 已提交
989
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
990
      return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
H
Haojun Liao 已提交
991
    }
dengyihao's avatar
dengyihao 已提交
992
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
993
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
994
      hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
995
    }
996

997 998 999
    if (hasNext) {
      return hasNext;
    }
1000

1001
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
1002
      return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
H
Haojun Liao 已提交
1003
    }
1004 1005
  } else {
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
1006
      hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
1007 1008
    }
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
1009
      hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
1010
    }
H
Haojun Liao 已提交
1011
  }
1012

H
Haojun Liao 已提交
1013 1014 1015
  return hasNext;
}

1016
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
Hongze Cheng 已提交
1017
  STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
dengyihao's avatar
dengyihao 已提交
1018
  size_t    size = taosArrayGetSize(pHandle->pTableCheckInfo);
1019
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
1020
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
1021

1022
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
1023 1024 1025 1026
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

C
Cary Xu 已提交
1027
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
1028
  if (row == NULL) {
1029 1030
    return false;
  }
1031

H
Haojun Liao 已提交
1032
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
dengyihao's avatar
dengyihao 已提交
1033 1034
  tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
            pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
1035

1036
  // all data in mem are checked already.
1037 1038
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
1039 1040
    return false;
  }
H
Haojun Liao 已提交
1041

dengyihao's avatar
dengyihao 已提交
1042
  int32_t      step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
1043
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
1044
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
1045

1046 1047 1048 1049
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
1050

1051
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1052
    TSWAP(win->skey, win->ekey);
1053
  }
H
Haojun Liao 已提交
1054

1055
  return true;
1056
}
H
hjxilinx 已提交
1057

1058 1059
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
1060 1061 1062
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
1063

D
dapan1121 已提交
1064
  if (key < 0) {
1065
    key -= (daysPerFile * tsTickPerMin[precision]);
D
dapan1121 已提交
1066
  }
dengyihao's avatar
dengyihao 已提交
1067

1068
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision]));  // set the starting fileId
dengyihao's avatar
dengyihao 已提交
1069
  if (fid < 0L && llabs(fid) > INT32_MAX) {                                // data value overflow for INT32
1070 1071
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
1072

1073
  if (fid > 0L && fid > INT32_MAX) {
1074 1075
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
1076

S
TD-1057  
Shengliang Guan 已提交
1077
  return (int32_t)fid;
1078 1079
}

H
refact  
Hongze Cheng 已提交
1080
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1081 1082
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1083

1084
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1085

1086 1087 1088
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1089

1090
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1091

H
Hongze Cheng 已提交
1092
    if (skey > pBlock[midSlot].maxKey.ts) {
1093
      if (numOfBlocks == 2) break;
H
Hongze Cheng 已提交
1094
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
1095
      firstSlot = midSlot + 1;
H
Hongze Cheng 已提交
1096 1097
    } else if (skey < pBlock[midSlot].minKey.ts) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
1098 1099 1100 1101 1102
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1103

1104 1105
  return midSlot;
}
1106

dengyihao's avatar
dengyihao 已提交
1107
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1108
  int32_t code = 0;
H
Haojun Liao 已提交
1109

1110
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1111
  pCheckInfo->numOfBlocks = 0;
1112

H
Hongze Cheng 已提交
1113
  STable table = {.uid = pCheckInfo->tableId, .suid = pCheckInfo->suid};
1114
  table.pSchema = pTsdbReadHandle->pSchema;
H
Haojun Liao 已提交
1115 1116

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1117 1118 1119
    code = terrno;
    return code;
  }
1120

1121
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1122

H
Haojun Liao 已提交
1123
  // no data block in this file, try next file
1124
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1125 1126
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1127

H
Haojun Liao 已提交
1128 1129 1130
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1131
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1132 1133 1134 1135
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1136 1137
    }

H
Haojun Liao 已提交
1138 1139 1140
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1141

1142
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1143 1144
    return terrno;
  }
H
Haojun Liao 已提交
1145
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1146

H
Haojun Liao 已提交
1147
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1148

1149
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1150 1151
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1152
  } else {
dengyihao's avatar
dengyihao 已提交
1153 1154
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1155
  }
1156

dengyihao's avatar
dengyihao 已提交
1157 1158
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1159

H
Haojun Liao 已提交
1160 1161 1162
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1163

H
Hongze Cheng 已提交
1164
  if (s > pCompInfo->blocks[start].maxKey.ts) {
H
Haojun Liao 已提交
1165 1166
    return 0;
  }
1167

H
Haojun Liao 已提交
1168
  // todo speedup the procedure of located end block
H
Hongze Cheng 已提交
1169
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)) {
H
Haojun Liao 已提交
1170 1171
    end += 1;
  }
1172

H
Haojun Liao 已提交
1173
  pCheckInfo->numOfBlocks = (end - start);
1174

H
Haojun Liao 已提交
1175 1176 1177
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1178

H
Haojun Liao 已提交
1179 1180 1181
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1182

1183
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1184 1185 1186 1187
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1188
  pTsdbReadHandle->cost.headFileLoad += 1;
1189 1190
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1191
  size_t numOfTables = 0;
1192 1193 1194 1195
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1196

H
Haojun Liao 已提交
1197
    for (int32_t i = 0; i < numOfTables; ++i) {
1198
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1199
      if (code != TSDB_CODE_SUCCESS) {
1200 1201
        int64_t e = taosGetTimestampUs();

1202
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1203 1204 1205 1206 1207
        return code;
      }
    }
  } else {
    assert(0);
1208
  }
1209

1210
  int64_t e = taosGetTimestampUs();
1211
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1212
  return code;
1213 1214
}

dengyihao's avatar
dengyihao 已提交
1215 1216
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                   int32_t slotIndex) {
H
Haojun Liao 已提交
1217
  int64_t st = taosGetTimestampUs();
1218

C
Cary Xu 已提交
1219
  int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1220
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1221
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1222 1223 1224 1225
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1226
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1227
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1228
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1229 1230 1231 1232
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1233
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1234
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1235
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1236 1237 1238
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1239

1240
  int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
1241

dengyihao's avatar
dengyihao 已提交
1242 1243
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
                                      (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1244
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1245 1246 1247
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1248
  }
1249

1250
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1251

1252 1253
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1254
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1255

1256
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1257
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1258

1259
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1260

1261
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
H
Hongze Cheng 已提交
1262
  if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1263
    int64_t* src = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1264
    for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
1265 1266 1267 1268
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1269
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1270
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1271

dengyihao's avatar
dengyihao 已提交
1272 1273
  tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
            " us, %s",
H
Hongze Cheng 已提交
1274
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
dengyihao's avatar
dengyihao 已提交
1275
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1276
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1277 1278 1279 1280

_error:
  pBlock->numOfRows = 0;

dengyihao's avatar
dengyihao 已提交
1281
  tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
H
Hongze Cheng 已提交
1282 1283
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1284
  return terrno;
H
hjxilinx 已提交
1285 1286
}

1287
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
dengyihao's avatar
dengyihao 已提交
1288 1289 1290 1291 1292 1293 1294
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end);
static void    doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void    copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                              SDataBlockInfo* pBlockInfo, int32_t endPos);

static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
1295
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1296
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1297
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1298
  TSKEY          key;
dengyihao's avatar
dengyihao 已提交
1299
  int32_t        code = TSDB_CODE_SUCCESS;
1300

1301
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1302 1303
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

C
Cary Xu 已提交
1304
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update, TD_VER_MAX);
1305

H
Haojun Liao 已提交
1306
  if (key != TSKEY_INITIAL_VAL) {
dengyihao's avatar
dengyihao 已提交
1307
    tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1308
  } else {
H
Haojun Liao 已提交
1309
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1310
  }
H
Haojun Liao 已提交
1311

1312 1313
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

1314 1315
  if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
1316 1317 1318
    bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
                                    (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
    if (cacheDataInFileBlockHole) {
H
Haojun Liao 已提交
1319
      // do not load file block into buffer
1320
      int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1321

1322
      TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
dengyihao's avatar
dengyihao 已提交
1323 1324
      cur->rows =
          tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
1325
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1326 1327 1328

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1329 1330

      if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1331
        TSWAP(cur->win.skey, cur->win.ekey);
H
Haojun Liao 已提交
1332
      }
H
Haojun Liao 已提交
1333

H
Haojun Liao 已提交
1334 1335
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1336
      return code;
H
Haojun Liao 已提交
1337
    }
H
Haojun Liao 已提交
1338

1339
    // return error, add test cases
1340
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1341
      return code;
1342 1343
    }

1344
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1345
  } else {
1346 1347 1348 1349 1350
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1351
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1352

1353 1354
    bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
    if (wholeBlockReturned) {
1355
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1356 1357

      cur->rows = binfo.rows;
dengyihao's avatar
dengyihao 已提交
1358
      cur->win = binfo.window;
1359
      cur->mixBlock = false;
H
Haojun Liao 已提交
1360 1361
      cur->blockCompleted = true;

1362
      if (ascScan) {
H
Haojun Liao 已提交
1363 1364 1365 1366 1367 1368
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
dengyihao's avatar
dengyihao 已提交
1369
    } else {  // partially copy to dest buffer
1370
      // make sure to only load once
dengyihao's avatar
dengyihao 已提交
1371
      bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan)));
1372 1373 1374 1375 1376 1377 1378
      if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
        code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }

1379
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1380 1381
      cur->mixBlock = true;
    }
1382

1383
    if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
1384
      ASSERT(cur->blockCompleted || cur->mixBlock);
1385 1386
    }

H
Haojun Liao 已提交
1387
    if (cur->rows == binfo.rows) {
dengyihao's avatar
dengyihao 已提交
1388
      tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
H
Haojun Liao 已提交
1389
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1390
    } else {
dengyihao's avatar
dengyihao 已提交
1391 1392 1393 1394
      tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
                ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
                pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1395
    }
1396
  }
H
Haojun Liao 已提交
1397 1398

  return code;
1399 1400
}

1401 1402
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);

dengyihao's avatar
dengyihao 已提交
1403 1404
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                 bool* exists) {
1405
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1406 1407
  int32_t        code = TSDB_CODE_SUCCESS;
  bool           asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1408

1409
  if (asc) {
H
Haojun Liao 已提交
1410
    // query ended in/started from current block
H
Hongze Cheng 已提交
1411
    if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
1412
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1413 1414
        *exists = false;
        return code;
1415
      }
1416

1417
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1418
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1419

H
Hongze Cheng 已提交
1420
      if (pCheckInfo->lastKey > pBlock->minKey.ts) {
1421
        cur->pos =
1422
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1423 1424 1425
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1426

H
Hongze Cheng 已提交
1427
      assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
1428
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1429
    } else {  // the whole block is loaded in to buffer
dengyihao's avatar
dengyihao 已提交
1430
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1431
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1432
    }
dengyihao's avatar
dengyihao 已提交
1433
  } else {  // desc order, query ended in current block
H
Hongze Cheng 已提交
1434
    if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
1435
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1436 1437
        *exists = false;
        return code;
1438
      }
H
Haojun Liao 已提交
1439

1440
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Hongze Cheng 已提交
1441
      if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
dengyihao's avatar
dengyihao 已提交
1442 1443
        cur->pos =
            binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1444
      } else {
H
Haojun Liao 已提交
1445
        cur->pos = pBlock->numOfRows - 1;
1446
      }
H
Haojun Liao 已提交
1447

H
Hongze Cheng 已提交
1448
      assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
1449
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1450
    } else {
dengyihao's avatar
dengyihao 已提交
1451
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1452
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1453
    }
1454
  }
1455

1456
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1457
  return code;
H
[td-32]  
hjxilinx 已提交
1458 1459
}

1460
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1461
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1462
  int    numOfRows;
1463 1464
  TSKEY* keyList;

1465
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1466

1467
  if (num <= 0) return -1;
1468 1469

  keyList = (TSKEY*)pValue;
1470 1471
  firstPos = 0;
  lastPos = num - 1;
1472

1473
  if (order == TSDB_ORDER_DESC) {
1474 1475 1476 1477 1478
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1479

H
Haojun Liao 已提交
1480 1481
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1482

1483 1484 1485 1486 1487 1488 1489 1490
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1491

1492 1493 1494 1495 1496
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1497

1498 1499 1500 1501 1502 1503 1504
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1505

H
Haojun Liao 已提交
1506 1507
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1508

1509 1510 1511 1512 1513 1514 1515 1516 1517
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1518

1519 1520 1521
  return midPos;
}

dengyihao's avatar
dengyihao 已提交
1522 1523
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end) {
1524
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1525
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1526

1527
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1528 1529 1530 1531 1532 1533
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1534 1535
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
  int32_t trueStart = ascScan ? start : end;
1536
  int32_t trueEnd = ascScan ? end : start;
1537 1538
  int32_t step = ascScan ? 1 : -1;

1539
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1540

dengyihao's avatar
dengyihao 已提交
1541
  // data in buffer has greater timestamp, copy data in file block
1542
  int32_t i = 0, j = 0;
dengyihao's avatar
dengyihao 已提交
1543
  while (i < requiredNumOfCols && j < pCols->numOfCols) {
1544
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1545 1546 1547 1548 1549 1550 1551

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1552
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1553
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
dengyihao's avatar
dengyihao 已提交
1554
        //        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1555
        int32_t rowIndex = numOfRows;
1556
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1557
          SCellVal sVal = {0};
C
Cary Xu 已提交
1558
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1559 1560
            TASSERT(0);
          }
1561

C
Cary Xu 已提交
1562
          if (sVal.valType == TD_VTYPE_NORM) {
1563
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1564 1565
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1566 1567 1568
          }
        }
      } else {  // handle the var-string
1569 1570
        int32_t rowIndex = numOfRows;

1571
        // todo refactor, only copy one-by-one
1572
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1573
          SCellVal sVal = {0};
dengyihao's avatar
dengyihao 已提交
1574
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
H
Haojun Liao 已提交
1575 1576
            TASSERT(0);
          }
1577

C
Cary Xu 已提交
1578
          if (sVal.valType == TD_VTYPE_NORM) {
1579
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1580 1581
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1582
          }
1583 1584
        }
      }
1585 1586 1587

      j++;
      i++;
H
Hongze Cheng 已提交
1588
    } else {  // pColInfo->info.colId < src->colId, it is a NULL data
1589
      colDataAppendNNULL(pColInfo, numOfRows, num);
1590
      i++;
1591 1592
    }
  }
1593

dengyihao's avatar
dengyihao 已提交
1594
  while (i < requiredNumOfCols) {  // the remain columns are all null data
1595
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1596
    colDataAppendNNULL(pColInfo, numOfRows, num);
1597
    i++;
1598
  }
H
Haojun Liao 已提交
1599

1600 1601
  pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
  pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
1602

1603
  return numOfRows + num;
1604 1605
}

C
Cary Xu 已提交
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625
/**
 * @brief  // TODO fix bug for reverse copy data problem
 *        Note: row1 always has high priority
 *
 * @param pTsdbReadHandle
 * @param capacity
 * @param curRow
 * @param row1
 * @param row2
 * @param numOfCols
 * @param uid
 * @param pSchema1
 * @param pSchema2
 * @param update
 * @param lastRowKey
 * @return int32_t The quantity of rows appended
 */
static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
                                  STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                                  bool update, TSKEY* lastRowKey) {
H
Haojun Liao 已提交
1626
#if 1
dengyihao's avatar
dengyihao 已提交
1627 1628 1629 1630 1631 1632 1633 1634 1635
  STSchema* pSchema;
  STSRow*   row;
  int16_t   colId;
  int16_t   offset;

  bool     isRow1DataRow = TD_IS_TP_ROW(row1);
  bool     isRow2DataRow;
  bool     isChosenRowDataRow;
  int32_t  chosen_itr;
H
Haojun Liao 已提交
1636
  SCellVal sVal = {0};
C
Cary Xu 已提交
1637 1638
  TSKEY    rowKey = TSKEY_INITIAL_VAL;
  int32_t  nResult = 0;
C
Cary Xu 已提交
1639
  int32_t  mergeOption = 0;  // 0 discard 1 overwrite 2 merge
1640

H
Haojun Liao 已提交
1641
  // the schema version info is embeded in STSRow
1642 1643 1644
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
1645
    pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
1646
  }
1647

C
Cary Xu 已提交
1648
#ifdef TD_DEBUG_PRINT_ROW
C
Cary Xu 已提交
1649 1650 1651 1652 1653
  char   flags[70] = {0};
  STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
  snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
           pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
  tdSRowPrint(row1, pSchema1, flags);
C
Cary Xu 已提交
1654 1655
#endif

dengyihao's avatar
dengyihao 已提交
1656
  if (isRow1DataRow) {
1657
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1658
  } else {
H
Haojun Liao 已提交
1659
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1660
  }
1661

1662
  int32_t numOfColsOfRow2 = 0;
dengyihao's avatar
dengyihao 已提交
1663
  if (row2) {
H
Haojun Liao 已提交
1664
    isRow2DataRow = TD_IS_TP_ROW(row2);
1665
    if (pSchema2 == NULL) {
1666
      pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
1667
    }
dengyihao's avatar
dengyihao 已提交
1668
    if (isRow2DataRow) {
1669 1670
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1671
      numOfColsOfRow2 = tdRowGetNCols(row2);
1672 1673
    }
  }
C
Cary Xu 已提交
1674

1675
  int32_t i = 0, j = 0, k = 0;
dengyihao's avatar
dengyihao 已提交
1676
  while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1677
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1678 1679

    int32_t colIdOfRow1;
dengyihao's avatar
dengyihao 已提交
1680
    if (j >= numOfColsOfRow1) {
1681
      colIdOfRow1 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1682
    } else if (isRow1DataRow) {
1683 1684
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
C
Cary Xu 已提交
1685
      colIdOfRow1 = tdKvRowColIdAt(row1, j);
1686 1687 1688
    }

    int32_t colIdOfRow2;
dengyihao's avatar
dengyihao 已提交
1689
    if (k >= numOfColsOfRow2) {
1690
      colIdOfRow2 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1691
    } else if (isRow2DataRow) {
1692 1693
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
C
Cary Xu 已提交
1694
      colIdOfRow2 = tdKvRowColIdAt(row2, k);
1695 1696
    }

C
Cary Xu 已提交
1697
    if (colIdOfRow1 < colIdOfRow2) {  // the most probability
dengyihao's avatar
dengyihao 已提交
1698
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1699
        ++j;
C
Cary Xu 已提交
1700 1701
        continue;
      }
1702 1703 1704 1705
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
C
Cary Xu 已提交
1706
    } else if (colIdOfRow1 == colIdOfRow2) {
dengyihao's avatar
dengyihao 已提交
1707
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1708 1709
        ++j;
        ++k;
1710
        continue;
C
Cary Xu 已提交
1711
      }
1712 1713 1714 1715 1716
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
dengyihao's avatar
dengyihao 已提交
1717
      if (colIdOfRow2 < pColInfo->info.colId) {
C
Cary Xu 已提交
1718
        ++k;
1719 1720 1721 1722 1723 1724 1725
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
C
Cary Xu 已提交
1726

dengyihao's avatar
dengyihao 已提交
1727
    if (isChosenRowDataRow) {
1728 1729
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
C
Cary Xu 已提交
1730 1731
      // TODO: use STSRowIter
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
C
Cary Xu 已提交
1732 1733 1734
      if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1735
          mergeOption = 1;
C
Cary Xu 已提交
1736 1737 1738
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1739
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1740
          ++nResult;
C
Cary Xu 已提交
1741
        } else if (update) {
C
Cary Xu 已提交
1742 1743 1744 1745
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1746 1747
        }
      }
1748
    } else {
C
Cary Xu 已提交
1749 1750 1751 1752
      // TODO: use STSRowIter
      if (chosen_itr == 0) {
        colId = PRIMARYKEY_TIMESTAMP_COL_ID;
        tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
C
Cary Xu 已提交
1753 1754
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1755
          mergeOption = 1;
C
Cary Xu 已提交
1756 1757 1758
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1759
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1760
          ++nResult;
C
Cary Xu 已提交
1761
        } else if (update) {
C
Cary Xu 已提交
1762 1763 1764 1765
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1766
        }
C
Cary Xu 已提交
1767 1768 1769 1770 1771 1772
      } else {
        SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
        colId = pColIdx->colId;
        offset = pColIdx->offset;
        tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
      }
1773 1774
    }

C
Cary Xu 已提交
1775 1776
    ASSERT(rowKey != TSKEY_INITIAL_VAL);

1777
    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1778
      if (tdValTypeIsNorm(sVal.valType)) {
C
Cary Xu 已提交
1779 1780 1781 1782 1783
        colDataAppend(pColInfo, *curRow, sVal.val, false);
      } else if (tdValTypeIsNull(sVal.valType)) {
        colDataAppend(pColInfo, *curRow, NULL, true);
      } else if (tdValTypeIsNone(sVal.valType)) {
        // TODO: Set null if nothing append for this row
C
Cary Xu 已提交
1784
        if (mergeOption == 1) {
C
Cary Xu 已提交
1785 1786 1787 1788
          colDataAppend(pColInfo, *curRow, NULL, true);
        }
      } else {
        ASSERT(0);
1789
      }
H
Haojun Liao 已提交
1790

C
Cary Xu 已提交
1791
      ++i;
C
Cary Xu 已提交
1792

dengyihao's avatar
dengyihao 已提交
1793
      if (row == row1) {
C
Cary Xu 已提交
1794
        ++j;
1795
      } else {
C
Cary Xu 已提交
1796
        ++k;
1797 1798
      }
    } else {
C
Cary Xu 已提交
1799
      if (mergeOption == 1) {
C
Cary Xu 已提交
1800
        colDataAppend(pColInfo, *curRow, NULL, true);
C
Cary Xu 已提交
1801
      }
C
Cary Xu 已提交
1802
      ++i;
1803
    }
1804
  }
1805

C
Cary Xu 已提交
1806
  if (mergeOption == 1) {
dengyihao's avatar
dengyihao 已提交
1807
    while (i < numOfCols) {  // the remain columns are all null data
1808
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
C
Cary Xu 已提交
1809 1810
      colDataAppend(pColInfo, *curRow, NULL, true);
      ++i;
1811 1812
    }
  }
C
Cary Xu 已提交
1813 1814

  return nResult;
H
Haojun Liao 已提交
1815
#endif
1816
}
1817

dengyihao's avatar
dengyihao 已提交
1818 1819
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
                                int32_t numOfExisted, int32_t* start, int32_t* end) {
1820 1821
  *start = -1;

1822
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1823
    int32_t remain = endPos - startPos + 1;
1824 1825
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1826 1827
    } else {
      *end = endPos;
1828 1829 1830 1831 1832
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1833 1834
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1835 1836
    } else {
      *end = endPos;
1837 1838 1839 1840 1841 1842 1843
    }

    *start = *end;
    *end = startPos;
  }
}

dengyihao's avatar
dengyihao 已提交
1844 1845
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
                                 int32_t endPos) {
1846
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1847 1848

  pCheckInfo->lastKey = cur->lastKey;
1849
  pTsdbReadHandle->realNumOfRows = numOfRows;
1850 1851 1852 1853
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1854 1855
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1856 1857

  if (cur->rows > 0) {
1858 1859
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1860
    } else {
1861
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1862 1863
    }

1864
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
dengyihao's avatar
dengyihao 已提交
1865 1866
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
           cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
H
Haojun Liao 已提交
1867
  } else {
1868
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1869

dengyihao's avatar
dengyihao 已提交
1870
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1871
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1872 1873 1874
  }
}

dengyihao's avatar
dengyihao 已提交
1875 1876
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                           SDataBlockInfo* pBlockInfo, int32_t endPos) {
1877
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1878

1879
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1880
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1881

1882
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Haojun Liao 已提交
1883

dengyihao's avatar
dengyihao 已提交
1884
  int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1885 1886 1887 1888

  int32_t start = cur->pos;
  int32_t end = endPos;

1889
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1890
    TSWAP(start, end);
H
Haojun Liao 已提交
1891 1892
  }

1893 1894
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1895 1896

  // the time window should always be ascending order: skey <= ekey
dengyihao's avatar
dengyihao 已提交
1897
  cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1898
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
dengyihao's avatar
dengyihao 已提交
1899 1900
  cur->lastKey = tsArray[endPos] + step;
  cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Haojun Liao 已提交
1901

H
Haojun Liao 已提交
1902
  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
1903
  int32_t pos = endPos + step;
1904 1905
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1906

dengyihao's avatar
dengyihao 已提交
1907 1908 1909
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1910 1911
}

1912
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1913 1914
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1915
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
dengyihao's avatar
dengyihao 已提交
1916
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1917

1918
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1919
  SDataCols*     pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1920

1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932
  if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
      endPos = pBlockInfo->rows - 1;
      cur->mixBlock = (cur->pos != 0);
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
      endPos = 0;
      cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
    } else {
      assert(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
      cur->mixBlock = true;
    }
H
Haojun Liao 已提交
1933
  } else {
1934
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
1935
      endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
1936
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
1937
      endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952
    } else {
      ASSERT(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);

      // current data is more than the capacity
      int32_t size = abs(cur->pos - endPos) + 1;
      if (size > pTsdbReadHandle->outputCapacity) {
        int32_t delta = size - pTsdbReadHandle->outputCapacity;
        if (ascScan) {
          endPos -= delta;
        } else {
          endPos += delta;
        }
      }
    }
H
Haojun Liao 已提交
1953 1954 1955 1956 1957 1958
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1959 1960
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1961 1962
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1963
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
H
Hongze Cheng 已提交
1964
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1965

1966
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1967

1968 1969
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
dengyihao's avatar
dengyihao 已提交
1970
         cur->pos >= 0 && cur->pos < pBlock->numOfRows);
1971
  // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
1972
  TSKEY* tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
1973 1974
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->minKey.ts &&
         tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
1975

dengyihao's avatar
dengyihao 已提交
1976
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1977 1978
  int32_t step = ascScan ? 1 : -1;

1979
  // for search the endPos, so the order needs to reverse
1980
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
1981

1982 1983
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1984

1985
  STimeWindow* pWin = &blockInfo.window;
H
Hongze Cheng 已提交
1986
  tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
dengyihao's avatar
dengyihao 已提交
1987 1988 1989
            " rows:%d, start:%d, end:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1990

1991 1992
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
C
Cary Xu 已提交
1993
  int32_t curRow = 0;
H
Haojun Liao 已提交
1994

dengyihao's avatar
dengyihao 已提交
1995 1996
  int16_t   rv1 = -1;
  int16_t   rv2 = -1;
1997 1998
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1999

H
Haojun Liao 已提交
2000 2001
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
2002
  bool adjustPos = false;
2003

2004 2005
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
2006
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
2007
    return;
2008
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
2009
    SSkipListNode* node = NULL;
C
Cary Xu 已提交
2010
    TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;
C
Cary Xu 已提交
2011

2012
    do {
H
Haojun Liao 已提交
2013
      STSRow* row2 = NULL;
C
Cary Xu 已提交
2014
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
2015
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
2016
        break;
2017
      }
2018

H
Haojun Liao 已提交
2019
      TSKEY key = TD_ROW_KEY(row1);
2020
      if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
2021 2022 2023
        break;
      }

2024 2025 2026 2027 2028 2029 2030
      if (adjustPos) {
        if (key == lastKeyAppend) {
          pos -= step;
        }
        adjustPos = false;
      }

2031 2032
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
2033 2034 2035
        break;
      }

2036
      if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
H
Haojun Liao 已提交
2037
        if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2038
          //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2039
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
2040
        }
dengyihao's avatar
dengyihao 已提交
2041 2042
        if (row2 && rv2 != TD_ROW_SVER(row2)) {
          //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2043
          rv2 = TD_ROW_SVER(row2);
2044
        }
dengyihao's avatar
dengyihao 已提交
2045

C
Cary Xu 已提交
2046 2047 2048
        numOfRows +=
            mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                               pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
2049 2050 2051
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
2052

2053
        cur->win.ekey = key;
dengyihao's avatar
dengyihao 已提交
2054
        cur->lastKey = key + step;
2055
        cur->mixBlock = true;
2056
        moveToNextRowInMem(pCheckInfo);
2057
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
C
Cary Xu 已提交
2058
#if 0
H
TD-1439  
Hongze Cheng 已提交
2059
        if (pCfg->update) {
dengyihao's avatar
dengyihao 已提交
2060
          if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
2061
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
2062
          }
H
Haojun Liao 已提交
2063
          if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2064
            //            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2065
            rv1 = TD_ROW_SVER(row1);
2066
          }
dengyihao's avatar
dengyihao 已提交
2067 2068
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            //            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2069
            rv2 = TD_ROW_SVER(row2);
2070
          }
dengyihao's avatar
dengyihao 已提交
2071

2072
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
dengyihao's avatar
dengyihao 已提交
2073
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
C
Cary Xu 已提交
2074
                             pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull, &lastRowKey);
H
TD-1439  
Hongze Cheng 已提交
2075 2076 2077 2078 2079 2080 2081 2082 2083
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

C
Cary Xu 已提交
2084 2085 2086 2087 2088 2089 2090
          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
#endif
        if (TD_SUPPORT_UPDATE(pCfg->update)) {
2091 2092 2093 2094
          if (lastKeyAppend != key) {
            if (lastKeyAppend != TSKEY_INITIAL_VAL) {
              ++curRow;
            }
2095
            lastKeyAppend = key;
2096 2097
          }
          // load data from file firstly
C
Cary Xu 已提交
2098
          numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
C
Cary Xu 已提交
2099 2100 2101 2102 2103 2104 2105

          if (rv1 != TD_ROW_SVER(row1)) {
            rv1 = TD_ROW_SVER(row1);
          }
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            rv2 = TD_ROW_SVER(row2);
          }
2106 2107

          // still assign data into current row
dengyihao's avatar
dengyihao 已提交
2108 2109 2110
          numOfRows +=
              mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                                 pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
C
Cary Xu 已提交
2111 2112 2113 2114 2115 2116 2117 2118 2119

          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

H
TD-1439  
Hongze Cheng 已提交
2120
          moveToNextRowInMem(pCheckInfo);
2121

H
TD-1439  
Hongze Cheng 已提交
2122
          pos += step;
2123
          adjustPos = true;
H
TD-1439  
Hongze Cheng 已提交
2124
        } else {
2125
          // discard the memory record
H
TD-1439  
Hongze Cheng 已提交
2126 2127
          moveToNextRowInMem(pCheckInfo);
        }
2128
      } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
2129 2130 2131
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
2132

2133
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
2134 2135
        assert(end != -1);

dengyihao's avatar
dengyihao 已提交
2136
        if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
C
Cary Xu 已提交
2137
#if 0
2138
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
2139 2140 2141 2142
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
C
Cary Xu 已提交
2143 2144 2145 2146 2147 2148
#endif
          if (!TD_SUPPORT_UPDATE(pCfg->update)) {
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
2149
        }
2150

2151
        int32_t qstart = 0, qend = 0;
2152
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
2153

2154
        if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
C
Cary Xu 已提交
2155 2156
          ++curRow;
        }
2157

C
Cary Xu 已提交
2158
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
2159
        pos += (qend - qstart + 1) * step;
C
Cary Xu 已提交
2160 2161 2162
        if (numOfRows > 0) {
          curRow = numOfRows - 1;
        }
2163

2164
        cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
dengyihao's avatar
dengyihao 已提交
2165
        cur->lastKey = cur->win.ekey + step;
C
Cary Xu 已提交
2166
        lastKeyAppend = cur->win.ekey;
2167
      }
2168
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
2169

2170
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
2171 2172 2173 2174
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
dengyihao's avatar
dengyihao 已提交
2175
      if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
2176
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
2177 2178 2179 2180 2181
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

2182
        int32_t start = -1, end = -1;
2183
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
2184

2185
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
2186
        pos += (end - start + 1) * step;
2187

2188
        cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
dengyihao's avatar
dengyihao 已提交
2189
        cur->lastKey = cur->win.ekey + step;
H
Haojun Liao 已提交
2190
        cur->mixBlock = true;
2191
      }
2192 2193
    }
  }
H
Haojun Liao 已提交
2194

2195
  cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
dengyihao's avatar
dengyihao 已提交
2196
                         ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
2197

2198
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
2199
    TSWAP(cur->win.skey, cur->win.ekey);
2200
  }
2201

2202 2203
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2204

dengyihao's avatar
dengyihao 已提交
2205 2206 2207
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
2208 2209
}

2210
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2211
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2212
  int    numOfRows;
2213 2214
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2215
  if (num <= 0) return -1;
2216 2217

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2218 2219
  firstPos = 0;
  lastPos = num - 1;
2220

2221
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2222 2223 2224 2225 2226
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2227

H
Haojun Liao 已提交
2228 2229
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2230

H
[td-32]  
hjxilinx 已提交
2231 2232 2233 2234 2235 2236 2237 2238
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2239

H
[td-32]  
hjxilinx 已提交
2240 2241 2242 2243 2244
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2245

H
[td-32]  
hjxilinx 已提交
2246 2247 2248 2249 2250 2251 2252
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2253

H
Haojun Liao 已提交
2254 2255
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2256

H
[td-32]  
hjxilinx 已提交
2257 2258 2259 2260 2261 2262 2263 2264 2265
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2266

H
[td-32]  
hjxilinx 已提交
2267 2268 2269
  return midPos;
}

2270
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
2271 2272
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
2273 2274

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2275
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
2276
    taosMemoryFreeClear(pBlockInfo);
2277 2278
  }

wafwerar's avatar
wafwerar 已提交
2279
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2291
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2292 2293
    /* left block is empty */
    return 1;
2294
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2295 2296 2297 2298 2299 2300 2301
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2302
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
dengyihao's avatar
dengyihao 已提交
2303
#if 0  // TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2304 2305
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2306
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2307
  }
H
Haojun Liao 已提交
2308
#endif
2309

H
Haojun Liao 已提交
2310
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2311 2312
}

2313
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2314 2315
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2316 2317
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2318
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2319 2320 2321 2322
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

dengyihao's avatar
dengyihao 已提交
2323
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
2324 2325
  }

2326
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2327 2328
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2329
  // access data blocks according to the offset of each block in asc/desc order.
2330
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2331

2332 2333
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2334 2335 2336
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2337

2338
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2339
    cleanBlockOrderSupporter(&sup, 0);
2340
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2341
  }
H
Haojun Liao 已提交
2342

2343
  int32_t cnt = 0;
2344
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2345

2346
  for (int32_t j = 0; j < numOfTables; ++j) {
2347
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2348 2349 2350
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2351

H
refact  
Hongze Cheng 已提交
2352
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2353
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2354

wafwerar's avatar
wafwerar 已提交
2355
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2356
    if (buf == NULL) {
2357
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2358
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2359 2360
    }

2361
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2362 2363

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2364
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2365

H
Haojun Liao 已提交
2366 2367
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2368 2369 2370
      cnt++;
    }

2371
    numOfQualTables++;
2372 2373
  }

H
Haojun Liao 已提交
2374
  assert(numOfBlocks == cnt);
2375

H
Haojun Liao 已提交
2376 2377
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2378
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2379
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2380

H
Haojun Liao 已提交
2381
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2382
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2383 2384
    return TSDB_CODE_SUCCESS;
  }
2385

H
Haojun Liao 已提交
2386
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2387
            numOfQualTables, pTsdbReadHandle->idStr);
2388

2389
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2390
  sup.numOfTables = numOfQualTables;
2391

2392
  SMultiwayMergeTreeInfo* pTree = NULL;
dengyihao's avatar
dengyihao 已提交
2393
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2394 2395
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2396
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2397 2398 2399 2400 2401
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2402
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2403 2404
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2405
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2406
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2407 2408

    // set data block index overflow, in order to disable the offset comparator
2409 2410
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2411
    }
2412

H
Haojun Liao 已提交
2413
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2414 2415 2416 2417 2418
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2419
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2420 2421 2422
   * }
   */

H
Haojun Liao 已提交
2423
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2424
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2425
  taosMemoryFree(pTree);
2426 2427 2428 2429

  return TSDB_CODE_SUCCESS;
}

2430
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2431

2432
static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
dengyihao's avatar
dengyihao 已提交
2433
  int32_t        step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2434
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2435

dengyihao's avatar
dengyihao 已提交
2436
  while (1) {
2437
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2438 2439 2440 2441
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2442 2443
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2444
      // all data blocks in current file has been checked already, try next file if exists
2445
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2446 2447 2448 2449
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2450
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2451 2452 2453 2454
    }
  }
}

2455 2456 2457
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2458 2459 2460

  int32_t code = TSDB_CODE_SUCCESS;

2461
  int32_t numOfBlocks = 0;
2462
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2463

C
Cary Xu 已提交
2464
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
C
Cary Xu 已提交
2465
  STimeWindow   win = TSWINDOW_INITIALIZER;
2466

H
Hongze Cheng 已提交
2467
  while (true) {
2468
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2469

2470 2471
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2472 2473 2474
      break;
    }

H
refact  
Hongze Cheng 已提交
2475
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2476 2477

    // current file are not overlapped with query time window, ignore remain files
2478 2479 2480
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2481 2482
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2483 2484
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2485 2486 2487
      break;
    }

2488 2489
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2490 2491 2492 2493
      code = terrno;
      break;
    }

2494
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2495

2496
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2497 2498 2499 2500
      code = terrno;
      break;
    }

2501
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2502 2503
      break;
    }
H
Haojun Liao 已提交
2504

H
Haojun Liao 已提交
2505 2506
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2507

2508 2509 2510 2511
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2512

2513
    // todo return error code to query engine
dengyihao's avatar
dengyihao 已提交
2514 2515
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
        TSDB_CODE_SUCCESS) {
2516 2517
      break;
    }
H
Haojun Liao 已提交
2518

2519 2520
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2521 2522 2523
      break;
    }
  }
H
Haojun Liao 已提交
2524

2525
  // no data in file anymore
2526
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2527
    if (code == TSDB_CODE_SUCCESS) {
2528
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2529 2530
    }

D
dapan1121 已提交
2531
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2532 2533
    *exists = false;
    return code;
2534
  }
H
Haojun Liao 已提交
2535

2536
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
dengyihao's avatar
dengyihao 已提交
2537
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
2538
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2539

2540
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2541
  return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2542 2543 2544 2545 2546 2547 2548
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2549
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2550
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
2551

2552 2553
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2554 2555

  cur->slot += step;
dengyihao's avatar
dengyihao 已提交
2556
  cur->mixBlock = false;
H
Haojun Liao 已提交
2557
  cur->blockCompleted = false;
2558
}
H
Haojun Liao 已提交
2559

2560 2561 2562 2563
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}

H
Haojun Liao 已提交
2564
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
dengyihao's avatar
dengyihao 已提交
2565
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
H
Haojun Liao 已提交
2566

H
Haojun Liao 已提交
2567
  pTableBlockInfo->totalSize = 0;
2568
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2569

2570
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2571 2572

  // find the start data block in file
2573
  pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2574 2575
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
  int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2576 2577

  tsdbRLockFS(pFileHandle);
2578 2579
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2580 2581
  tsdbUnLockFS(pFileHandle);

2582 2583 2584 2585 2586 2587
  STsdbCfg* pc = REPO_CFG(pTsdbReadHandle->pTsdb);
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;

  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);

H
Haojun Liao 已提交
2588
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2589

H
Haojun Liao 已提交
2590
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2591
  int32_t     numOfBlocks = 0;
2592
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2593
  int         defaultRows = 4096;
H
Haojun Liao 已提交
2594 2595 2596 2597
  STimeWindow win = TSWINDOW_INITIALIZER;

  while (true) {
    numOfBlocks = 0;
2598
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2599

2600 2601
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2602 2603 2604
      break;
    }

H
refact  
Hongze Cheng 已提交
2605
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2606 2607

    // current file are not overlapped with query time window, ignore remain files
2608
    if ((win.skey > pTsdbReadHandle->window.ekey)/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
2609
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2610 2611
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2612
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2613 2614 2615
      break;
    }

H
Haojun Liao 已提交
2616
    pTableBlockInfo->numOfFiles += 1;
2617 2618
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2619 2620 2621 2622
      code = terrno;
      break;
    }

2623
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2624

2625
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2626 2627 2628 2629
      code = terrno;
      break;
    }

2630
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2631 2632 2633
      break;
    }

H
Haojun Liao 已提交
2634 2635
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2636 2637 2638 2639 2640

    if (numOfBlocks == 0) {
      continue;
    }

2641 2642
    pTableBlockInfo->numOfBlocks += numOfBlocks;

H
Haojun Liao 已提交
2643
    for (int32_t i = 0; i < numOfTables; ++i) {
2644
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2645 2646

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
2647

H
Haojun Liao 已提交
2648
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2649
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2650

H
Haojun Liao 已提交
2651
        int32_t numOfRows = pBlock[j].numOfRows;
2652
        pTableBlockInfo->totalRows += numOfRows;
2653

H
Haojun Liao 已提交
2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
2665 2666 2667

        int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
        pTableBlockInfo->blockRowsHisto[bucketIndex]++;
H
Haojun Liao 已提交
2668 2669 2670 2671
      }
    }
  }

2672
  pTableBlockInfo->numOfTables = numOfTables;
H
Haojun Liao 已提交
2673 2674 2675
  return code;
}

2676 2677 2678
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2679 2680

  // find the start data block in file
2681 2682
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2683 2684
    STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
    int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2685

H
Hongze Cheng 已提交
2686
    tsdbRLockFS(pFileHandle);
2687 2688
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2689
    tsdbUnLockFS(pFileHandle);
2690

2691
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2692
  } else {
2693
    // check if current file block is all consumed
2694
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2695
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2696

2697
    // current block is done, try next
H
Haojun Liao 已提交
2698
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2699
      // all data blocks in current file has been checked already, try next file if exists
2700
    } else {
H
Haojun Liao 已提交
2701 2702
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2703 2704
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2705

H
Haojun Liao 已提交
2706 2707 2708 2709 2710 2711 2712
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2713 2714
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2715
    } else {
2716 2717
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2718
      return getDataBlock(pTsdbReadHandle, pNext, exists);
2719 2720
    }
  }
2721 2722
}

2723 2724
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2725

2726 2727
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2728 2729
      return true;
    }
H
Haojun Liao 已提交
2730

2731
    pTsdbReadHandle->activeIndex += 1;
2732
  }
H
Haojun Liao 已提交
2733

2734 2735 2736
  return false;
}

dengyihao's avatar
dengyihao 已提交
2737
// todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2738
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2739
  // filter the queried time stamp in the first place
dengyihao's avatar
dengyihao 已提交
2740
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
2741 2742

  // starts from the buffer in case of descending timestamp order check data blocks
2743
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2744 2745

  int32_t i = 0;
dengyihao's avatar
dengyihao 已提交
2746
  while (i < numOfTables) {
2747
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2748 2749

    // the first qualified table for interpolation query
dengyihao's avatar
dengyihao 已提交
2750 2751 2752 2753
    //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
    //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
    //      break;
    //    }
H
Haojun Liao 已提交
2754 2755 2756 2757 2758 2759 2760 2761 2762

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2763
  STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
2764
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2765

2766 2767
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2768 2769 2770
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2771
                                 STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2772
  int       numOfRows = 0;
C
Cary Xu 已提交
2773
  int       curRows = 0;
dengyihao's avatar
dengyihao 已提交
2774
  int32_t   numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Hongze Cheng 已提交
2775
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2776 2777
  win->skey = TSKEY_INITIAL_VAL;

dengyihao's avatar
dengyihao 已提交
2778 2779
  int64_t   st = taosGetTimestampUs();
  int16_t   rv = -1;
D
fix bug  
dapan1121 已提交
2780
  STSchema* pSchema = NULL;
C
Cary Xu 已提交
2781 2782
  TSKEY     lastRowKey = TSKEY_INITIAL_VAL;

H
Haojun Liao 已提交
2783
  do {
C
Cary Xu 已提交
2784
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
2785 2786 2787 2788
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2789
    TSKEY key = TD_ROW_KEY(row);
dengyihao's avatar
dengyihao 已提交
2790 2791 2792 2793
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
                key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2794 2795 2796 2797 2798 2799 2800 2801 2802

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2803
    if (rv != TD_ROW_SVER(row)) {
2804
      pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, TD_ROW_SVER(row));
H
Haojun Liao 已提交
2805
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2806
    }
C
Cary Xu 已提交
2807 2808
    numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
                                    pSchema, NULL, pCfg->update, &lastRowKey);
H
Haojun Liao 已提交
2809

C
Cary Xu 已提交
2810
    if (numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
2811 2812 2813 2814
      moveToNextRowInMem(pCheckInfo);
      break;
    }

dengyihao's avatar
dengyihao 已提交
2815
  } while (moveToNextRowInMem(pCheckInfo));
H
Haojun Liao 已提交
2816

C
Cary Xu 已提交
2817
  taosMemoryFreeClear(pSchema);  // free the STSChema
H
Haojun Liao 已提交
2818 2819 2820
  assert(numOfRows <= maxRowsToRead);

  int64_t elapsedTime = taosGetTimestampUs() - st;
dengyihao's avatar
dengyihao 已提交
2821 2822
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
            pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2823 2824 2825 2826

  return numOfRows;
}

dengyihao's avatar
dengyihao 已提交
2827 2828 2829 2830 2831 2832
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2833 2834 2835 2836 2837 2838
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
wmmhello's avatar
wmmhello 已提交
2839
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
2840
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2841

2842 2843 2844 2845 2846
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2847

2848
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2849 2850 2851
    taosArrayPush(list, &info);
  }

C
Cary Xu 已提交
2852
  metaCloseCtbCursor(pCur);
H
Haojun Liao 已提交
2853 2854 2855
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871
int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);

  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseCtbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2872 2873 2874 2875 2876
static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2877 2878 2879 2880 2881 2882
  //  tQueryInfo* pInfo = (tQueryInfo*)param;
  //  if (pInfo->optr != TSDB_RELATION_IN) {
  //    taosMemoryFreeClear(pInfo->q);
  //  } else {
  //    taosHashCleanup((SHashObj *)(pInfo->q));
  //  }
H
Haojun Liao 已提交
2883

wafwerar's avatar
wafwerar 已提交
2884
  taosMemoryFree(param);
H
Haojun Liao 已提交
2885 2886
}

dengyihao's avatar
dengyihao 已提交
2887 2888
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
2889

dengyihao's avatar
dengyihao 已提交
2890
static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
2891
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2892 2893
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2894

2895
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2896
    if (code != TSDB_CODE_SUCCESS) {
2897
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2898 2899
      return false;
    }
H
Haojun Liao 已提交
2900

H
Haojun Liao 已提交
2901
    if (exists) {
dengyihao's avatar
dengyihao 已提交
2902
      tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
2903 2904 2905
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2906 2907
      }

dengyihao's avatar
dengyihao 已提交
2908
      pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2909 2910
      return exists;
    }
H
Haojun Liao 已提交
2911

2912
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2913
  }
H
Haojun Liao 已提交
2914

2915 2916
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2917 2918
    return true;
  }
H
Haojun Liao 已提交
2919

H
Haojun Liao 已提交
2920
  // current result is empty
dengyihao's avatar
dengyihao 已提交
2921 2922
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
      pTsdbReadHandle->cur.rows == 0) {
H
refact  
Hongze Cheng 已提交
2923
    //    SMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2924

dengyihao's avatar
dengyihao 已提交
2925 2926
    //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
    //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2927

2928
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2929

dengyihao's avatar
dengyihao 已提交
2930 2931
    //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
    //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
2932
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2933 2934

    return result;
2935
  }
H
Haojun Liao 已提交
2936

H
Haojun Liao 已提交
2937 2938
  return false;
}
2939

2940
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2941
  // the last row is cached in buffer, return it directly.
2942
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
dengyihao's avatar
dengyihao 已提交
2943
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
2944
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2945 2946
  assert(numOfTables > 0 && numOfCols > 0);

2947
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2948

dengyihao's avatar
dengyihao 已提交
2949 2950 2951
  STSRow* pRow = NULL;
  TSKEY   key = TSKEY_INITIAL_VAL;
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
C
Cary Xu 已提交
2952 2953
  TSKEY   lastRowKey = TSKEY_INITIAL_VAL;
  int32_t curRow = 0;
2954 2955 2956

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
dengyihao's avatar
dengyihao 已提交
2957 2958 2959 2960
    //    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
    //    if (ret != TSDB_CODE_SUCCESS) {
    //      return false;
    //    }
C
Cary Xu 已提交
2961 2962
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
                       pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
wafwerar's avatar
wafwerar 已提交
2963
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2964

H
Haojun Liao 已提交
2965 2966 2967
    // update the last key value
    pCheckInfo->lastKey = key + step;

dengyihao's avatar
dengyihao 已提交
2968 2969
    cur->rows = 1;  // only one row
    cur->lastKey = key + step;
H
Haojun Liao 已提交
2970 2971 2972 2973 2974
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2975
  }
H
Haojun Liao 已提交
2976

H
Haojun Liao 已提交
2977 2978 2979
  return false;
}

dengyihao's avatar
dengyihao 已提交
2980
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
dengyihao's avatar
dengyihao 已提交
3000 3001
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
//      pTable->tableId); continue;
3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
3132 3133 3134
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
3135

dengyihao's avatar
dengyihao 已提交
3136
  while (pTsdbReadHandle->activeIndex < numOfTables) {
3137
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
3138 3139 3140
      return true;
    }

3141
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
3142 3143
    pCheckInfo->numOfBlocks = 0;

3144 3145
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
3146 3147
    pTsdbReadHandle->checkFiles = true;
    pTsdbReadHandle->cur.rows = 0;
3148
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
3149 3150 3151 3152

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
3153
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3154 3155 3156
  }

  return false;
3157 3158
}

H
Haojun Liao 已提交
3159
// handle data in cache situation
3160
// bool tsdbNextDataBlock(tsdbReaderT pHandle, uint64_t uid)
H
Haojun Liao 已提交
3161
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3162
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
Y
yihaoDeng 已提交
3163

3164 3165
  size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns);
  for (int32_t i = 0; i < numOfCols; ++i) {
3166 3167 3168 3169
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

3170
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3171 3172
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
              pTsdbReadHandle->idStr);
3173 3174 3175
    return false;
  }

Y
yihaoDeng 已提交
3176 3177 3178
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

3179
  // TODO refactor: remove "type"
3180 3181
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
dengyihao's avatar
dengyihao 已提交
3182
      //      return loadCachedLastRow(pTsdbReadHandle);
3183
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
dengyihao's avatar
dengyihao 已提交
3184
      //      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
3185
    }
H
Haojun Liao 已提交
3186
  }
Y
yihaoDeng 已提交
3187

3188 3189
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
dengyihao's avatar
dengyihao 已提交
3190
  } else {  // loadType == RR and Offset Order
3191
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
3192 3193 3194
      // check if the query range overlaps with the file data block
      bool exists = true;

3195
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
3196
      if (code != TSDB_CODE_SUCCESS) {
3197 3198
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
3199 3200 3201 3202 3203

        return false;
      }

      if (exists) {
3204
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
3205 3206
        return exists;
      }
Y
yihaoDeng 已提交
3207

3208 3209
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
3210 3211
    }

H
Haojun Liao 已提交
3212
    // TODO: opt by consider the scan order
3213
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
3214
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
3215

H
Haojun Liao 已提交
3216
    elapsedTime = taosGetTimestampUs() - stime;
3217
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3218
    return ret;
Y
yihaoDeng 已提交
3219 3220
  }
}
3221

H
refact  
Hongze Cheng 已提交
3222
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) {
3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
3257
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
3258 3259 3260 3261 3262 3263 3264 3265 3266
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
3267
//  SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
3268 3269 3270 3271 3272 3273 3274 3275
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
3276
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
3277 3278 3279 3280 3281 3282 3283 3284 3285 3286
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3287
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
3288
//  taosMemoryFreeClear(cond.colList);
3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
dengyihao's avatar
dengyihao 已提交
3327
// out_of_memory:
3328
//  tsdbCleanupReadHandle(pSecQueryHandle);
3329 3330 3331
//  return terrno;
//}

H
Haojun Liao 已提交
3332
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3333 3334
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
  SQueryFilePos*   cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3335

H
Haojun Liao 已提交
3336 3337
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3338
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3339 3340
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3341 3342
  }

dengyihao's avatar
dengyihao 已提交
3343
  int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3344
  for (int32_t i = 0; i < numOfCols; ++i) {
3345 3346
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3347 3348 3349

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3350
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3351
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3352 3353

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3354
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3355
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3356 3357 3358
    }
  }

H
Haojun Liao 已提交
3359 3360
  cur->rows = 2;
  return true;
3361 3362
}

3363
/*
3364
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3365
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3366
 */
H
Haojun Liao 已提交
3367
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3384
// out:
3385 3386 3387 3388
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3389
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
dengyihao's avatar
dengyihao 已提交
3390
  return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3391 3392
}

wmmhello's avatar
wmmhello 已提交
3393 3394
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) {
  assert(pTsdbReadHandle != NULL && tableList != NULL);
3395

dengyihao's avatar
dengyihao 已提交
3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419
  //  TSKEY    key = TSKEY_INITIAL_VAL;
  //
  //  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
  //  assert(group != NULL);
  //
  //  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
  //
  //  int32_t code = 0;
  //
  //  if (((STable*)pInfo->pTable)->lastRow) {
  //    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
  //    if (code != TSDB_CODE_SUCCESS) {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
  //    } else {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
  //    }
  //  }
  //
  //  // update the tsdb query time range
  //  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
  //    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
  //    pTsdbReadHandle->checkFiles  = false;
  //    pTsdbReadHandle->activeIndex = -1;  // start from -1
  //  }
H
Haojun Liao 已提交
3420

3421
  return TSDB_CODE_SUCCESS;
3422 3423
}

3424 3425
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3426 3427

  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
3428 3429 3430
  //  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
  //    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
  //  }
D
update  
dapan1121 已提交
3431 3432

  // update the tsdb query time range
3433
  if (pTsdbReadHandle->cachelastrow) {
dengyihao's avatar
dengyihao 已提交
3434
    pTsdbReadHandle->checkFiles = false;
3435
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3436 3437 3438 3439 3440
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3441
STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
H
Haojun Liao 已提交
3442
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3443

dengyihao's avatar
dengyihao 已提交
3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502
  //  int32_t totalNumOfTable = 0;
  //  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
  //
  //  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  //  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  //  for (int32_t j = 0; j < numOfGroups; ++j) {
  //    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
  //    TSKEY   key = TSKEY_INITIAL_VAL;
  //
  //    STableKeyInfo keyInfo = {0};
  //
  //    size_t numOfTables = taosArrayGetSize(pGroup);
  //    for (int32_t i = 0; i < numOfTables; ++i) {
  //      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
  //
  //      // if the lastKey equals to INT64_MIN, there is no data in this table
  //      TSKEY lastKey = 0;  //((STable*)(pInfo->pTable))->lastKey;
  //      if (key < lastKey) {
  //        key = lastKey;
  //
  //        //        keyInfo.pTable  = pInfo->pTable;
  //        keyInfo.lastKey = key;
  //        pInfo->lastKey = key;
  //
  //        if (key < window.skey) {
  //          window.skey = key;
  //        }
  //
  //        if (key > window.ekey) {
  //          window.ekey = key;
  //        }
  //      }
  //    }
  //
  //    // more than one table in each group, only one table left for each group
  //    //    if (keyInfo.pTable != NULL) {
  //    //      totalNumOfTable++;
  //    //      if (taosArrayGetSize(pGroup) == 1) {
  //    //        // do nothing
  //    //      } else {
  //    //        taosArrayClear(pGroup);
  //    //        taosArrayPush(pGroup, &keyInfo);
  //    //      }
  //    //    } else {  // mark all the empty groups, and remove it later
  //    //      taosArrayDestroy(pGroup);
  //    //      taosArrayPush(emptyGroup, &j);
  //    //    }
  //  }
  //
  //  // window does not being updated, so set the original
  //  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
  //    window = TSWINDOW_INITIALIZER;
  //    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
  //  }
  //
  //  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
  //  taosArrayDestroy(emptyGroup);
  //
  //  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3503
  return window;
H
hjxilinx 已提交
3504 3505
}

H
Haojun Liao 已提交
3506
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3507
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
dengyihao's avatar
dengyihao 已提交
3508
  SQueryFilePos*   cur = &pHandle->cur;
3509 3510

  uint64_t uid = 0;
H
Haojun Liao 已提交
3511

3512
  // there are data in file
D
dapan1121 已提交
3513
  if (pHandle->cur.fid != INT32_MIN) {
3514
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3515
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3516
  } else {
3517
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3518
    uid = pCheckInfo->tableId;
3519
  }
3520

dengyihao's avatar
dengyihao 已提交
3521 3522
  tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
            cur->win.skey, cur->win.ekey, pHandle->idStr);
3523

3524
  pDataBlockInfo->uid = uid;
3525 3526 3527 3528 3529 3530

#if 0
  // for multi-group data query processing test purpose
  pDataBlockInfo->groupId = uid;
#endif

dengyihao's avatar
dengyihao 已提交
3531
  pDataBlockInfo->rows = cur->rows;
H
Haojun Liao 已提交
3532
  pDataBlockInfo->window = cur->win;
3533
}
H
hjxilinx 已提交
3534

H
Haojun Liao 已提交
3535 3536 3537
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3538
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
dengyihao's avatar
dengyihao 已提交
3539
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3540
  *allHave = false;
H
Haojun Liao 已提交
3541

H
Haojun Liao 已提交
3542 3543
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3544 3545 3546
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3547

H
Haojun Liao 已提交
3548 3549 3550 3551
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3552 3553 3554 3555
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3556 3557

  int64_t stime = taosGetTimestampUs();
3558 3559
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3560
    return terrno;
3561 3562 3563
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3564
  }
H
Haojun Liao 已提交
3565

S
Shengliang Guan 已提交
3566
  tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
C
Cary Xu 已提交
3567 3568
            TSDB_READ_TABLE_UID(&pHandle->rhelper));

3569
  int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
3570

H
Haojun Liao 已提交
3571
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
3572 3573 3574
  memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

dengyihao's avatar
dengyihao 已提交
3575
  for (int32_t i = 0; i < numOfCols; ++i) {
3576
    pHandle->suppInfo.pstatis[i].colId = colIds[i];
3577
  }
H
Haojun Liao 已提交
3578

3579 3580
  *allHave = true;
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3581 3582

  // always load the first primary timestamp column data
3583
  SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
3584
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3585 3586

  pPrimaryColStatis->numOfNull = 0;
H
Hongze Cheng 已提交
3587 3588
  pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
  pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
3589
  pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
H
Haojun Liao 已提交
3590

dengyihao's avatar
dengyihao 已提交
3591
  // update the number of NULL data rows
3592
  int32_t* slotIds = pHandle->suppInfo.slotIds;
dengyihao's avatar
dengyihao 已提交
3593
  for (int32_t i = 1; i < numOfCols; ++i) {
3594
    ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
C
Cary Xu 已提交
3595
    if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) {
3596 3597 3598
      if (pHandle->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
        pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
      }
3599 3600

      pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
3601 3602
    } else {
      *allHave = false;
H
Haojun Liao 已提交
3603 3604
    }
  }
H
Haojun Liao 已提交
3605 3606 3607 3608

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

3609
  *pBlockStatis = pHandle->suppInfo.plist;
3610
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3611 3612
}

H
Haojun Liao 已提交
3613
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3614
  /**
H
hjxilinx 已提交
3615
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3616 3617
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3618
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3619
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3620 3621
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3622 3623
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3624

3625
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3626 3627
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3628
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3629
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3630

H
hjxilinx 已提交
3631 3632
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3633

H
Hongze Cheng 已提交
3634
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3635
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3636
        return pHandle->pColumns;
H
Haojun Liao 已提交
3637
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3638
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3639
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3640 3641
          return NULL;
        }
H
Haojun Liao 已提交
3642

H
Haojun Liao 已提交
3643
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
hjxilinx 已提交
3644 3645
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3646 3647
    }
  }
H
hjxilinx 已提交
3648
}
3649

H
Haojun Liao 已提交
3650
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3651
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3652
    return -1;
3653
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3654 3655 3656 3657 3658 3659 3660
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

3661 3662 3663 3664 3665 3666 3667 3668
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
H
Haojun Liao 已提交
3669
    colDataDestroy(pColInfo);
3670 3671 3672 3673 3674 3675
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3676 3677 3678 3679 3680 3681
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3682
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3683 3684 3685 3686 3687 3688
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

H
Haojun Liao 已提交
3689
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3690 3691
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3692 3693
    return;
  }
3694

3695
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3696

3697
  taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3698
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
3699 3700
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
H
Haojun Liao 已提交
3701
  taosMemoryFree(pTsdbReadHandle->suppInfo.slotIds);
3702

3703
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3704
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3705
  } else {
3706
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3707 3708
  }

3709 3710
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3711
  }
3712

3713
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3714

3715 3716
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3717

3718 3719
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3720

3721
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3722

dengyihao's avatar
dengyihao 已提交
3723 3724 3725 3726
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
            pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3727

H
Haojun Liao 已提交
3728 3729
  taosMemoryFree(pTsdbReadHandle->idStr);
  taosMemoryFree(pTsdbReadHandle->pSchema);
wafwerar's avatar
wafwerar 已提交
3730
  taosMemoryFreeClear(pTsdbReadHandle);
3731
}