tsdbRead.c 126.3 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18

#if 0
L
Liu Jicong 已提交
19
#include "vnode.h"
20

dengyihao's avatar
dengyihao 已提交
21 22
#define EXTRA_BYTES                2
#define ASCENDING_TRAVERSE(o)      (o == TSDB_ORDER_ASC)
23
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
24

H
Hongze Cheng 已提交
25 26 27 28
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                      \
  ((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
                    .numOfCols = (_block)->numOfCols,                                     \
                    .rows = (_block)->numOfRows,                                          \
29
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
30

H
hjxilinx 已提交
31
enum {
dengyihao's avatar
dengyihao 已提交
32 33
  TSDB_QUERY_TYPE_ALL = 1,
  TSDB_QUERY_TYPE_LAST = 2,
H
hjxilinx 已提交
34 35
};

36
enum {
dengyihao's avatar
dengyihao 已提交
37
  TSDB_CACHED_TYPE_NONE = 0,
38
  TSDB_CACHED_TYPE_LASTROW = 1,
dengyihao's avatar
dengyihao 已提交
39
  TSDB_CACHED_TYPE_LAST = 2,
40 41
};

42
typedef struct SQueryFilePos {
dengyihao's avatar
dengyihao 已提交
43 44 45 46 47 48 49
  int32_t     fid;
  int32_t     slot;
  int32_t     pos;
  int64_t     lastKey;
  int32_t     rows;
  bool        mixBlock;
  bool        blockCompleted;
50
  STimeWindow win;
51
} SQueryFilePos;
H
hjxilinx 已提交
52

53
typedef struct SDataBlockLoadInfo {
dengyihao's avatar
dengyihao 已提交
54 55 56 57
  SDFileSet* fileGroup;
  int32_t    slot;
  uint64_t   uid;
  SArray*    pLoadedCols;
58
} SDataBlockLoadInfo;
H
hjxilinx 已提交
59

60
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
61
  int32_t tid; /* table tid */
62 63
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
64

65
enum {
dengyihao's avatar
dengyihao 已提交
66
  CHECKINFO_CHOSEN_MEM = 0,
67
  CHECKINFO_CHOSEN_IMEM = 1,
dengyihao's avatar
dengyihao 已提交
68
  CHECKINFO_CHOSEN_BOTH = 2  // for update=2(merge case)
69 70
};

71
typedef struct STableCheckInfo {
H
Hongze Cheng 已提交
72 73 74 75 76 77 78 79 80 81
  uint64_t     suid;
  uint64_t     tableId;
  TSKEY        lastKey;
  SBlockInfo*  pCompInfo;
  int32_t      compSize;
  int32_t      numOfBlocks : 29;  // number of qualified data blocks not the original blocks
  uint8_t      chosen : 2;        // indicate which iterator should move forward
  bool         initBuf : 1;       // whether to initialize the in-memory skip list iterator or not
  STbDataIter* iter;              // mem buffer skip list iterator
  STbDataIter* iiter;             // imem buffer skip list iterator
82
} STableCheckInfo;
83

84
typedef struct STableBlockInfo {
dengyihao's avatar
dengyihao 已提交
85 86
  SBlock*          compBlock;
  STableCheckInfo* pTableCheckInfo;
87
} STableBlockInfo;
88

89
typedef struct SBlockOrderSupporter {
dengyihao's avatar
dengyihao 已提交
90 91 92 93
  int32_t           numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*          blockIndexArray;
  int32_t*          numOfBlocksPerTable;
94 95
} SBlockOrderSupporter;

H
Haojun Liao 已提交
96 97 98
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
99
  int64_t checkForNextTime;
100 101
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
102 103
} SIOCostSummary;

104
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
105 106 107 108
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
  SArray*          defaultLoadColumn;  // default load column
  int32_t*         slotIds;            // colId to slotId
109 110
} SBlockLoadSuppInfo;

111
typedef struct STsdbReadHandle {
C
Cary Xu 已提交
112
  STsdb*        pTsdb;
H
more  
Hongze Cheng 已提交
113
  uint64_t      suid;
C
Cary Xu 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  SQueryFilePos cur;  // current position
  int16_t       order;
  STimeWindow   window;  // the primary query time window that applies to all queries
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
  int32_t numOfBlocks;
  SArray* pColumns;  // column list, SColumnInfoData array list
  bool    locateStart;
  int32_t outputCapacity;
  int32_t realNumOfRows;
  SArray* pTableCheckInfo;  // SArray<STableCheckInfo>
  int32_t activeIndex;
  bool    checkFiles;               // check file stage
  int8_t  cachelastrow;             // check if last row cached
  bool    loadExternalRow;          // load time window external data rows
  bool    currentLoadExternalRows;  // current load external rows
  int32_t loadType;                 // block load type
  char*   idStr;                    // query info handle, for debug purpose
dengyihao's avatar
dengyihao 已提交
132 133 134 135 136
  int32_t type;  // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
  SDFileSet*         pFileGroup;
  SFSIter            fileIter;
  SReadH             rhelper;
  STableBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
137 138 139 140
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
141
  SBlockLoadSuppInfo suppInfo;
C
Cary Xu 已提交
142 143 144 145
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
  SIOCostSummary     cost;
  STSchema*          pSchema;
146
} STsdbReadHandle;
147

wmmhello's avatar
wmmhello 已提交
148 149
static STimeWindow updateLastrowForEachGroup(STableListInfo* pList);
static int32_t     checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList);
dengyihao's avatar
dengyihao 已提交
150
static int32_t     checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
151
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
152

H
Haojun Liao 已提交
153
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
154
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
dengyihao's avatar
dengyihao 已提交
155 156
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
                                     STsdbReadHandle* pTsdbReadHandle);
157
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
dengyihao's avatar
dengyihao 已提交
158 159 160 161
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
// static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
162

C
Cary Xu 已提交
163
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions);
C
Cary Xu 已提交
164

165
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
166
  pBlockLoadInfo->slot = -1;
dengyihao's avatar
dengyihao 已提交
167
  pBlockLoadInfo->uid = 0;
H
hjxilinx 已提交
168
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
169 170
}

171
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
172
  pCompBlockLoadInfo->tid = -1;
173 174
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
175

176 177
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
178 179 180 181
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
182
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
183 184 185 186 187 188
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

189 190
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
191 192 193 194 195

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
196 197
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
198 199 200 201 202 203
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
204
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
dengyihao's avatar
dengyihao 已提交
205
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
206

H
refact  
Hongze Cheng 已提交
207 208
  int64_t    rows = 0;
  SMemTable* pMemTable = NULL;  // pTsdbReadHandle->pMemTable;
dengyihao's avatar
dengyihao 已提交
209 210 211
  if (pMemTable == NULL) {
    return rows;
  }
H
Haojun Liao 已提交
212 213 214 215 216

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

dengyihao's avatar
dengyihao 已提交
217 218 219 220 221 222 223 224
    //    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
    //      pMem = pMemT->tData[pCheckInfo->tableId];
    //      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
    //    }
    //    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
    //      pIMem = pIMemT->tData[pCheckInfo->tableId];
    //      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
    //    }
H
Haojun Liao 已提交
225 226 227
  }
  return rows;
}
228

wmmhello's avatar
wmmhello 已提交
229 230 231
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
  size_t tableSize = taosArrayGetSize(pTableList->pTableList);
  assert(tableSize >= 1);
H
Haojun Liao 已提交
232 233

  // allocate buffer in order to load data blocks from file
wmmhello's avatar
wmmhello 已提交
234
  SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
235 236 237 238 239
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
240 241
  for (int32_t j = 0; j < tableSize; ++j) {
    STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
H
Haojun Liao 已提交
242

wmmhello's avatar
wmmhello 已提交
243
    STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
H
more  
Hongze Cheng 已提交
244
    info.suid = pTsdbReadHandle->suid;
wmmhello's avatar
wmmhello 已提交
245 246
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
247
        info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
248 249
      }

wmmhello's avatar
wmmhello 已提交
250 251 252
      assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
    } else {
      info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
253
    }
wmmhello's avatar
wmmhello 已提交
254 255

    taosArrayPush(pTableCheckInfo, &info);
dengyihao's avatar
dengyihao 已提交
256 257
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
258 259
  }

260
  // TODO  group table according to the tag value.
261
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
262 263 264
  return pTableCheckInfo;
}

265 266
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
267 268 269 270
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
dengyihao's avatar
dengyihao 已提交
271
    STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
272
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
H
Hongze Cheng 已提交
273 274
    pCheckInfo->iter = tsdbTbDataIterDestroy(pCheckInfo->iter);
    pCheckInfo->iiter = tsdbTbDataIterDestroy(pCheckInfo->iiter);
275
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
276

277 278
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
279
    } else {
280
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
281
    }
H
Haojun Liao 已提交
282 283 284
  }
}

H
Haojun Liao 已提交
285 286 287
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
288

dengyihao's avatar
dengyihao 已提交
289
  STableCheckInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
290

H
Haojun Liao 已提交
291 292
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
293 294 295
  return pNew;
}

296 297
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
298

299
  STimeWindow* w = &pTsdbReadHandle->window;
dengyihao's avatar
dengyihao 已提交
300
  bool         asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
301 302 303 304

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

305 306
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
307
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
C
Cary Xu 已提交
308
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
309 310

  int64_t now = taosGetTimestamp(pCfg->precision);
311
  return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
312 313
}

314 315
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
316

317
  bool    updateTs = false;
318 319 320 321
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
322
      pCond->twindows[tWinIdx].skey = startTs;
323
      updateTs = true;
324 325
    }
  } else {
326 327
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
328
      pCond->twindows[tWinIdx].ekey = startTs;
329
      updateTs = true;
330 331 332
    }
  }

333
  if (updateTs) {
H
Haojun Liao 已提交
334
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
L
Liu Jicong 已提交
335 336
              pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey,
              pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
337
  }
338
}
C
Cary Xu 已提交
339

C
Cary Xu 已提交
340
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) {
C
Cary Xu 已提交
341
  if (VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
342
    int     level = 0;
C
Cary Xu 已提交
343
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
C
Cary Xu 已提交
344

C
Cary Xu 已提交
345
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
C
Cary Xu 已提交
346 347 348 349 350
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
C
Cary Xu 已提交
351 352
        break;
      }
C
Cary Xu 已提交
353
      if ((now - pRetention->keep) <= winSKey) {
C
Cary Xu 已提交
354
        break;
C
Cary Xu 已提交
355 356
      }
      ++level;
C
Cary Xu 已提交
357
    }
C
Cary Xu 已提交
358

C
Cary Xu 已提交
359
    if (level == TSDB_RETENTION_L0) {
S
Shengliang Guan 已提交
360
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
361
                TSDB_RETENTION_L0);
C
Cary Xu 已提交
362 363
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
S
Shengliang Guan 已提交
364
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
365
                TSDB_RETENTION_L1);
C
Cary Xu 已提交
366 367
      return VND_RSMA1(pVnode);
    } else {
S
Shengliang Guan 已提交
368
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
369
                TSDB_RETENTION_L2);
C
Cary Xu 已提交
370 371
      return VND_RSMA2(pVnode);
    }
C
Cary Xu 已提交
372
  }
C
Cary Xu 已提交
373
  return VND_TSDB(pVnode);
C
Cary Xu 已提交
374 375
}

376
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
377
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
378
  if (pReadHandle == NULL) {
379
    goto _end;
380
  }
H
Haojun Liao 已提交
381

382
  STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions);
C
Cary Xu 已提交
383

dengyihao's avatar
dengyihao 已提交
384
  pReadHandle->order = pCond->order;
C
Cary Xu 已提交
385
  pReadHandle->pTsdb = pTsdb;
dengyihao's avatar
dengyihao 已提交
386 387 388 389 390 391
  pReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid = INT32_MIN;
  pReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles = true;
  pReadHandle->activeIndex = 0;  // current active table index
  pReadHandle->allocSize = 0;
392
  pReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
393
  pReadHandle->loadType = pCond->type;
394

H
Hongze Cheng 已提交
395
  pReadHandle->suid = pCond->suid;
dengyihao's avatar
dengyihao 已提交
396
  pReadHandle->outputCapacity = 4096;  //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
397 398 399
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
400
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
401
  snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
H
Haojun Liao 已提交
402 403
  pReadHandle->idStr = strdup(buf);

C
Cary Xu 已提交
404
  if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
405
    goto _end;
B
Bomin Zhang 已提交
406
  }
H
Haojun Liao 已提交
407

408
  assert(pCond != NULL);
409
  setQueryTimewindow(pReadHandle, pCond, 0);
410

411
  if (pCond->numOfCols > 0) {
H
Haojun Liao 已提交
412
    int32_t rowLen = 0;
dengyihao's avatar
dengyihao 已提交
413
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
414 415 416
      rowLen += pCond->colList[i].bytes;
    }

417 418 419 420 421 422
    // make sure the output SSDataBlock size be less than 2MB.
    int32_t TWOMB = 2 * 1024 * 1024;
    if (pReadHandle->outputCapacity * rowLen > TWOMB) {
      pReadHandle->outputCapacity = TWOMB / rowLen;
    }

423
    // allocate buffer in order to load data blocks from file
424 425
    pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
    if (pReadHandle->suppInfo.pstatis == NULL) {
426
      goto _end;
427
    }
H
Haojun Liao 已提交
428

429
    // todo: use list instead of array?
430 431
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
432
      goto _end;
433
    }
H
Haojun Liao 已提交
434

435 436 437
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
438

439
      int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
440
      if (code != TSDB_CODE_SUCCESS) {
441
        goto _end;
442
      }
443

444
      taosArrayPush(pReadHandle->pColumns, &colInfo);
B
Bomin Zhang 已提交
445
    }
H
Haojun Liao 已提交
446

447
    pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
448 449 450 451

    size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
    pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
    pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
H
Haojun Liao 已提交
452
  }
453

C
Cary Xu 已提交
454
  pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
455
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
456
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
457
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
458
    goto _end;
H
hjxilinx 已提交
459
  }
460

461 462
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
463

H
Haojun Liao 已提交
464
  return (tsdbReaderT)pReadHandle;
465

dengyihao's avatar
dengyihao 已提交
466
_end:
467
  tsdbCleanupReadHandle(pReadHandle);
468
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
469
  return NULL;
H
hjxilinx 已提交
470 471
}

472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle) {
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);

  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, pCheckInfo->tableId);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, sversion);
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
505
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
506
                             uint64_t taskId) {
507
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
508
  if (pTsdbReadHandle == NULL) {
509 510 511
    return NULL;
  }

512
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
513
    return (tsdbReaderT*)pTsdbReadHandle;
514
  }
H
Haojun Liao 已提交
515 516

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
517
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
518
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
519
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
520 521 522 523
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

524 525 526 527 528
  int32_t code = setCurrentSchema(pVnode, pTsdbReadHandle);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
529

C
Cary Xu 已提交
530
  int32_t  numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
531 532 533 534 535
  int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;

  STSchema* pSchema = pTsdbReadHandle->pSchema;

  int32_t i = 0, j = 0;
C
Cary Xu 已提交
536
  while (i < numOfCols && j < pSchema->numOfCols) {
537 538 539 540 541 542 543 544 545 546 547 548
    if (ids[i] == pSchema->columns[j].colId) {
      pTsdbReadHandle->suppInfo.slotIds[i] = j;
      i++;
      j++;
    } else if (ids[i] > pSchema->columns[j].colId) {
      j++;
    } else {
      //    tsdbCleanupReadHandle(pTsdbReadHandle);
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
    }
  }
549

wmmhello's avatar
wmmhello 已提交
550 551
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
            taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
dengyihao's avatar
dengyihao 已提交
552
            pTsdbReadHandle->idStr);
553

dengyihao's avatar
dengyihao 已提交
554
  return (tsdbReaderT)pTsdbReadHandle;
H
Haojun Liao 已提交
555 556
}

557
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
558
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
559

560 561 562
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
wafwerar's avatar
wafwerar 已提交
563
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
564 565 566 567 568
    }

    return;
  }

dengyihao's avatar
dengyihao 已提交
569
  pTsdbReadHandle->order = pCond->order;
570
  setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx);
dengyihao's avatar
dengyihao 已提交
571 572 573 574 575
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
576 577
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
578 579

  if (ASCENDING_TRAVERSE(pCond->order)) {
580
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
581
  } else {
582
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
583 584 585
  }

  // allocate buffer in order to load data blocks from file
586 587
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
588

589 590
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
591

592
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
593 594
}

L
Liu Jicong 已提交
595 596
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
                                     int32_t tWinIdx) {
597
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
598

dengyihao's avatar
dengyihao 已提交
599
  pTsdbReadHandle->order = pCond->order;
600
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
dengyihao's avatar
dengyihao 已提交
601 602 603 604 605
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
606 607
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
608 609

  if (ASCENDING_TRAVERSE(pCond->order)) {
610
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
611
  } else {
612
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
613 614 615
  }

  // allocate buffer in order to load data blocks from file
616 617
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
618

619 620
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
621

H
Haojun Liao 已提交
622
  SArray* pTable = NULL;
dengyihao's avatar
dengyihao 已提交
623
  //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
624

dengyihao's avatar
dengyihao 已提交
625
  //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
626

dengyihao's avatar
dengyihao 已提交
627 628
  pTsdbReadHandle->pTableCheckInfo = NULL;  // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
                                            // &pTable);
629
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
630
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
631 632
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
633

dengyihao's avatar
dengyihao 已提交
634 635
  //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
636 637
}

wmmhello's avatar
wmmhello 已提交
638
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
639
                             uint64_t taskId) {
640
  pCond->twindows[0] = updateLastrowForEachGroup(pList);
H
Haojun Liao 已提交
641 642

  // no qualified table
wmmhello's avatar
wmmhello 已提交
643
  if (taosArrayGetSize(pList->pTableList) == 0) {
H
Haojun Liao 已提交
644 645 646
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
647
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId);
648
  if (pTsdbReadHandle == NULL) {
649 650 651
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
652
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList);
dengyihao's avatar
dengyihao 已提交
653
  if (code != TSDB_CODE_SUCCESS) {  // set the numOfTables to be 0
H
Haojun Liao 已提交
654 655 656
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
657

658
  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey);
659 660
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
661
  }
dengyihao's avatar
dengyihao 已提交
662

663
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
664 665
}

666
#if 0
H
refact  
Hongze Cheng 已提交
667
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) {
668
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
669
  if (pTsdbReadHandle == NULL) {
670 671 672
    return NULL;
  }

673
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
674 675 676 677 678
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

679 680
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
681
  }
D
init  
dapan1121 已提交
682
  
683
  return pTsdbReadHandle;
H
hjxilinx 已提交
684 685
}

686
#endif
dengyihao's avatar
dengyihao 已提交
687
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
688
  assert(pHandle != NULL);
H
Haojun Liao 已提交
689

dengyihao's avatar
dengyihao 已提交
690
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
691

dengyihao's avatar
dengyihao 已提交
692
  size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
693
  SArray* res = taosArrayInit(size, POINTER_BYTES);
694 695 696
  return res;
}

H
Haojun Liao 已提交
697
// leave only one table for each group
dengyihao's avatar
dengyihao 已提交
698
// static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
wmmhello's avatar
wmmhello 已提交
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
//  assert(pGroupList);
//  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
//
//  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
//  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
//
//  for (int32_t i = 0; i < numOfGroup; ++i) {
//    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
//    size_t  numOfTables = taosArrayGetSize(oneGroup);
//
//    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
//    for (int32_t j = 0; j < numOfTables; ++j) {
//      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
//      //      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//      //        taosArrayPush(px, pInfo);
//      //        pNew->numOfTables += 1;
//      //        break;
//      //      }
//    }
//
//    // there are no data in this group
//    if (taosArrayGetSize(px) == 0) {
//      taosArrayDestroy(px);
//    } else {
//      taosArrayPush(pNew->pGroupList, &px);
//    }
//  }
//
//  return pNew;
//}
729

dengyihao's avatar
dengyihao 已提交
730
// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
wmmhello's avatar
wmmhello 已提交
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
//                                          uint64_t qId, uint64_t taskId) {
//  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
//
//  if (pNew->numOfTables == 0) {
//    tsdbDebug("update query time range to invalidate time window");
//
//    assert(taosArrayGetSize(pNew->pGroupList) == 0);
//    bool asc = ASCENDING_TRAVERSE(pCond->order);
//    if (asc) {
//      pCond->twindow.ekey = pCond->twindow.skey - 1;
//    } else {
//      pCond->twindow.skey = pCond->twindow.ekey - 1;
//    }
//  }
//
//  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
//  pTsdbReadHandle->loadExternalRow = true;
//  pTsdbReadHandle->currentLoadExternalRows = true;
//
//  return pTsdbReadHandle;
//}
752

753
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
754
  if (pCheckInfo->initBuf) {
755 756
    return true;
  }
H
Haojun Liao 已提交
757

758
  pCheckInfo->initBuf = true;
759
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
760

H
Hongze Cheng 已提交
761 762
  STbData* pMem = NULL;
  STbData* pIMem = NULL;
H
Hongze Cheng 已提交
763
  int8_t   backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0;
764

765
  TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
766
  if (pHandle->pTsdb->mem != NULL) {
H
Hongze Cheng 已提交
767
    tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem);
768
    if (pMem != NULL) {
H
Hongze Cheng 已提交
769
      tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter);
H
Haojun Liao 已提交
770
    }
771
  }
H
Haojun Liao 已提交
772

773
  if (pHandle->pTsdb->imem != NULL) {
H
Hongze Cheng 已提交
774
    tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem);
775
    if (pIMem != NULL) {
H
Hongze Cheng 已提交
776
      tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter);
H
Haojun Liao 已提交
777
    }
778
  }
H
Haojun Liao 已提交
779

780 781 782 783
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
784

H
Hongze Cheng 已提交
785 786 787 788
  bool memEmpty =
      (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL));
  bool imemEmpty =
      (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL));
dengyihao's avatar
dengyihao 已提交
789
  if (memEmpty && imemEmpty) {  // buffer is empty
790 791
    return false;
  }
H
Haojun Liao 已提交
792

793
  if (!memEmpty) {
H
Hongze Cheng 已提交
794
    TSDBROW row;
H
Haojun Liao 已提交
795

H
Hongze Cheng 已提交
796 797
    tsdbTbDataIterGet(pCheckInfo->iter, &row);
    TSKEY key = row.pTSRow->ts;  // first timestamp in buffer
798
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
799
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
H
Hongze Cheng 已提交
800 801
              pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey,
              pMem->sl.size, pHandle->idStr);
H
Haojun Liao 已提交
802 803 804 805 806 807 808

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

809
  } else {
dengyihao's avatar
dengyihao 已提交
810
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
811
  }
H
Haojun Liao 已提交
812

813
  if (!imemEmpty) {
H
Hongze Cheng 已提交
814
    TSDBROW row;
H
Haojun Liao 已提交
815

H
Hongze Cheng 已提交
816 817
    tsdbTbDataIterGet(pCheckInfo->iter, &row);
    TSKEY key = row.pTSRow->ts;  // first timestamp in buffer
818
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
819
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
H
Hongze Cheng 已提交
820 821
              pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey,
              pIMem->sl.size, pHandle->idStr);
H
Haojun Liao 已提交
822 823 824 825 826 827

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
828
  } else {
dengyihao's avatar
dengyihao 已提交
829
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
830
  }
H
Haojun Liao 已提交
831

832 833 834
  return true;
}

H
Haojun Liao 已提交
835
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
H
Hongze Cheng 已提交
836 837
  tsdbTbDataIterDestroy(pCheckInfo->iter);
  tsdbTbDataIterDestroy(pCheckInfo->iiter);
H
Haojun Liao 已提交
838 839
}

C
Cary Xu 已提交
840
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
H
Hongze Cheng 已提交
841
  TSDBROW row = {0};
H
Haojun Liao 已提交
842
  STSRow *rmem = NULL, *rimem = NULL;
H
Hongze Cheng 已提交
843

844
  if (pCheckInfo->iter) {
H
Hongze Cheng 已提交
845 846
    if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
      rmem = row.pTSRow;
847 848 849 850
    }
  }

  if (pCheckInfo->iiter) {
H
Hongze Cheng 已提交
851 852
    if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
      rimem = row.pTSRow;
853 854 855 856 857 858 859 860 861
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
862
    return TD_ROW_KEY(rmem);
863 864 865 866
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
867
    return TD_ROW_KEY(rimem);
868 869
  }

H
Haojun Liao 已提交
870 871
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
872 873

  if (r1 == r2) {
C
Cary Xu 已提交
874
#if 0
dengyihao's avatar
dengyihao 已提交
875
    if (update == TD_ROW_DISCARD_UPDATE) {
876 877
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
dengyihao's avatar
dengyihao 已提交
878
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
879 880 881 882 883
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
C
Cary Xu 已提交
884 885 886 887 888
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Hongze Cheng 已提交
889
      tsdbTbDataIterNext(pCheckInfo->iter);
C
Cary Xu 已提交
890
    }
891 892 893 894
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
dengyihao's avatar
dengyihao 已提交
895
  } else {
896 897 898 899 900
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

C
Cary Xu 已提交
901 902
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
                                 TDRowVerT maxVer) {
H
Hongze Cheng 已提交
903
  TSDBROW row;
H
Haojun Liao 已提交
904
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
905
  if (pCheckInfo->iter) {
H
Hongze Cheng 已提交
906 907
    if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) {
      rmem = row.pTSRow;
H
Haojun Liao 已提交
908 909
    }
  }
910

H
Haojun Liao 已提交
911
  if (pCheckInfo->iiter) {
H
Hongze Cheng 已提交
912 913
    if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) {
      rimem = row.pTSRow;
H
Haojun Liao 已提交
914 915
    }
  }
916

H
Haojun Liao 已提交
917 918
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
919
  }
920

H
Haojun Liao 已提交
921
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
922 923 924
    pCheckInfo->chosen = 0;
    return rmem;
  }
925

H
Haojun Liao 已提交
926
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
927 928 929
    pCheckInfo->chosen = 1;
    return rimem;
  }
930

H
Haojun Liao 已提交
931 932
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
933

934
  if (r1 == r2) {
C
Cary Xu 已提交
935
#if 0
936
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
937
      tSkipListIterNext(pCheckInfo->iter);
938
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
939
      return rimem;
dengyihao's avatar
dengyihao 已提交
940
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
941
      tSkipListIterNext(pCheckInfo->iiter);
942 943 944 945
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
946
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
947 948
      return rmem;
    }
C
Cary Xu 已提交
949 950 951 952 953 954
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
      *extraRow = rimem;
      return rmem;
    } else {
H
Hongze Cheng 已提交
955
      tsdbTbDataIterNext(pCheckInfo->iter);
C
Cary Xu 已提交
956 957 958
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      return rimem;
    }
H
Haojun Liao 已提交
959 960 961
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
962
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
963 964
        return rmem;
      } else {
965
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
966 967 968 969
        return rimem;
      }
    } else {
      if (r1 < r2) {
970
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
971 972
        return rimem;
      } else {
973
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
974 975 976 977
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
978 979
}

980
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
981
  bool hasNext = false;
982
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
983
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
984
      hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
H
Haojun Liao 已提交
985
    }
986

H
Haojun Liao 已提交
987 988 989
    if (hasNext) {
      return hasNext;
    }
990

H
Haojun Liao 已提交
991
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
992
      return tsdbTbDataIterGet(pCheckInfo->iiter, NULL);
H
Haojun Liao 已提交
993
    }
dengyihao's avatar
dengyihao 已提交
994
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
995
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
996
      hasNext = tsdbTbDataIterNext(pCheckInfo->iiter);
997
    }
998

999 1000 1001
    if (hasNext) {
      return hasNext;
    }
1002

1003
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
1004
      return tsdbTbDataIterGet(pCheckInfo->iter, NULL);
H
Haojun Liao 已提交
1005
    }
1006 1007
  } else {
    if (pCheckInfo->iter != NULL) {
H
Hongze Cheng 已提交
1008
      hasNext = tsdbTbDataIterNext(pCheckInfo->iter);
1009 1010
    }
    if (pCheckInfo->iiter != NULL) {
H
Hongze Cheng 已提交
1011
      hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext;
1012
    }
H
Haojun Liao 已提交
1013
  }
1014

H
Haojun Liao 已提交
1015 1016 1017
  return hasNext;
}

1018
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
Hongze Cheng 已提交
1019
  STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
dengyihao's avatar
dengyihao 已提交
1020
  size_t    size = taosArrayGetSize(pHandle->pTableCheckInfo);
1021
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
1022
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
1023

1024
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
1025 1026 1027 1028
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

C
Cary Xu 已提交
1029
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
1030
  if (row == NULL) {
1031 1032
    return false;
  }
1033

H
Haojun Liao 已提交
1034
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
dengyihao's avatar
dengyihao 已提交
1035 1036
  tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
            pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
1037

1038
  // all data in mem are checked already.
1039 1040
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
1041 1042
    return false;
  }
H
Haojun Liao 已提交
1043

dengyihao's avatar
dengyihao 已提交
1044
  int32_t      step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
1045
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
1046
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
1047

1048 1049 1050 1051
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
1052

1053
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1054
    TSWAP(win->skey, win->ekey);
1055
  }
H
Haojun Liao 已提交
1056

1057
  return true;
1058
}
H
hjxilinx 已提交
1059

1060 1061
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
1062 1063 1064
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
1065

D
dapan1121 已提交
1066
  if (key < 0) {
1067
    key -= (daysPerFile * tsTickPerMin[precision]);
D
dapan1121 已提交
1068
  }
dengyihao's avatar
dengyihao 已提交
1069

1070
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision]));  // set the starting fileId
dengyihao's avatar
dengyihao 已提交
1071
  if (fid < 0L && llabs(fid) > INT32_MAX) {                                // data value overflow for INT32
1072 1073
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
1074

1075
  if (fid > 0L && fid > INT32_MAX) {
1076 1077
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
1078

S
TD-1057  
Shengliang Guan 已提交
1079
  return (int32_t)fid;
1080 1081
}

H
refact  
Hongze Cheng 已提交
1082
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1083 1084
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1085

1086
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1087

1088 1089 1090
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1091

1092
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1093

H
Hongze Cheng 已提交
1094
    if (skey > pBlock[midSlot].maxKey.ts) {
1095
      if (numOfBlocks == 2) break;
H
Hongze Cheng 已提交
1096
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
1097
      firstSlot = midSlot + 1;
H
Hongze Cheng 已提交
1098 1099
    } else if (skey < pBlock[midSlot].minKey.ts) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
1100 1101 1102 1103 1104
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1105

1106 1107
  return midSlot;
}
1108

dengyihao's avatar
dengyihao 已提交
1109
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1110
  int32_t code = 0;
H
Haojun Liao 已提交
1111

1112
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1113
  pCheckInfo->numOfBlocks = 0;
1114

H
Hongze Cheng 已提交
1115
  STable table = {.uid = pCheckInfo->tableId, .suid = pCheckInfo->suid};
1116
  table.pSchema = pTsdbReadHandle->pSchema;
H
Haojun Liao 已提交
1117 1118

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1119 1120 1121
    code = terrno;
    return code;
  }
1122

1123
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1124

H
Haojun Liao 已提交
1125
  // no data block in this file, try next file
1126
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1127 1128
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1129

H
Haojun Liao 已提交
1130 1131 1132
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1133
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1134 1135 1136 1137
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1138 1139
    }

H
Haojun Liao 已提交
1140 1141 1142
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1143

1144
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1145 1146
    return terrno;
  }
H
Haojun Liao 已提交
1147
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1148

H
Haojun Liao 已提交
1149
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1150

1151
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1152 1153
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1154
  } else {
dengyihao's avatar
dengyihao 已提交
1155 1156
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1157
  }
1158

dengyihao's avatar
dengyihao 已提交
1159 1160
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1161

H
Haojun Liao 已提交
1162 1163 1164
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1165

H
Hongze Cheng 已提交
1166
  if (s > pCompInfo->blocks[start].maxKey.ts) {
H
Haojun Liao 已提交
1167 1168
    return 0;
  }
1169

H
Haojun Liao 已提交
1170
  // todo speedup the procedure of located end block
H
Hongze Cheng 已提交
1171
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)) {
H
Haojun Liao 已提交
1172 1173
    end += 1;
  }
1174

H
Haojun Liao 已提交
1175
  pCheckInfo->numOfBlocks = (end - start);
1176

H
Haojun Liao 已提交
1177 1178 1179
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1180

H
Haojun Liao 已提交
1181 1182 1183
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1184

1185
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1186 1187 1188 1189
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1190
  pTsdbReadHandle->cost.headFileLoad += 1;
1191 1192
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1193
  size_t numOfTables = 0;
1194 1195 1196 1197
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1198

H
Haojun Liao 已提交
1199
    for (int32_t i = 0; i < numOfTables; ++i) {
1200
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1201
      if (code != TSDB_CODE_SUCCESS) {
1202 1203
        int64_t e = taosGetTimestampUs();

1204
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1205 1206 1207 1208 1209
        return code;
      }
    }
  } else {
    assert(0);
1210
  }
1211

1212
  int64_t e = taosGetTimestampUs();
1213
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1214
  return code;
1215 1216
}

dengyihao's avatar
dengyihao 已提交
1217 1218
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                   int32_t slotIndex) {
H
Haojun Liao 已提交
1219
  int64_t st = taosGetTimestampUs();
1220

C
Cary Xu 已提交
1221
  int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1222
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1223
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1224 1225 1226 1227
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1228
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1229
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1230
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1231 1232 1233 1234
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1235
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1236
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1237
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1238 1239 1240
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1241

1242
  int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
1243

dengyihao's avatar
dengyihao 已提交
1244 1245
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
                                      (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1246
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1247 1248 1249
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1250
  }
1251

1252
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1253

1254 1255
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1256
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1257

1258
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1259
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1260

1261
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1262

1263
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
H
Hongze Cheng 已提交
1264
  if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1265
    int64_t* src = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1266
    for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
1267 1268 1269 1270
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1271
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1272
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1273

dengyihao's avatar
dengyihao 已提交
1274 1275
  tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
            " us, %s",
H
Hongze Cheng 已提交
1276
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
dengyihao's avatar
dengyihao 已提交
1277
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1278
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1279 1280 1281 1282

_error:
  pBlock->numOfRows = 0;

dengyihao's avatar
dengyihao 已提交
1283
  tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
H
Hongze Cheng 已提交
1284 1285
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1286
  return terrno;
H
hjxilinx 已提交
1287 1288
}

1289
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
dengyihao's avatar
dengyihao 已提交
1290 1291 1292 1293 1294 1295 1296
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end);
static void    doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void    copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                              SDataBlockInfo* pBlockInfo, int32_t endPos);

static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
1297
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1298
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1299
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1300
  TSKEY          key;
dengyihao's avatar
dengyihao 已提交
1301
  int32_t        code = TSDB_CODE_SUCCESS;
1302

1303
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1304 1305
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

C
Cary Xu 已提交
1306
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update, TD_VER_MAX);
1307

H
Haojun Liao 已提交
1308
  if (key != TSKEY_INITIAL_VAL) {
dengyihao's avatar
dengyihao 已提交
1309
    tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1310
  } else {
H
Haojun Liao 已提交
1311
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1312
  }
H
Haojun Liao 已提交
1313

1314 1315
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

1316 1317
  if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
1318 1319 1320
    bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
                                    (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
    if (cacheDataInFileBlockHole) {
H
Haojun Liao 已提交
1321
      // do not load file block into buffer
1322
      int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1323

1324
      TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
dengyihao's avatar
dengyihao 已提交
1325 1326
      cur->rows =
          tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
1327
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1328 1329 1330

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1331 1332

      if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1333
        TSWAP(cur->win.skey, cur->win.ekey);
H
Haojun Liao 已提交
1334
      }
H
Haojun Liao 已提交
1335

H
Haojun Liao 已提交
1336 1337
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1338
      return code;
H
Haojun Liao 已提交
1339
    }
H
Haojun Liao 已提交
1340

1341
    // return error, add test cases
1342
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1343
      return code;
1344 1345
    }

1346
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1347
  } else {
1348 1349 1350 1351 1352
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1353
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1354

1355 1356
    bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
    if (wholeBlockReturned) {
1357
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1358 1359

      cur->rows = binfo.rows;
dengyihao's avatar
dengyihao 已提交
1360
      cur->win = binfo.window;
1361
      cur->mixBlock = false;
H
Haojun Liao 已提交
1362 1363
      cur->blockCompleted = true;

1364
      if (ascScan) {
H
Haojun Liao 已提交
1365 1366 1367 1368 1369 1370
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
dengyihao's avatar
dengyihao 已提交
1371
    } else {  // partially copy to dest buffer
1372
      // make sure to only load once
dengyihao's avatar
dengyihao 已提交
1373
      bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan)));
1374 1375 1376 1377 1378 1379 1380
      if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
        code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }

1381
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1382 1383
      cur->mixBlock = true;
    }
1384

1385
    if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
1386
      ASSERT(cur->blockCompleted || cur->mixBlock);
1387 1388
    }

H
Haojun Liao 已提交
1389
    if (cur->rows == binfo.rows) {
dengyihao's avatar
dengyihao 已提交
1390
      tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
H
Haojun Liao 已提交
1391
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1392
    } else {
dengyihao's avatar
dengyihao 已提交
1393 1394 1395 1396
      tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
                ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
                pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1397
    }
1398
  }
H
Haojun Liao 已提交
1399 1400

  return code;
1401 1402
}

1403 1404
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);

dengyihao's avatar
dengyihao 已提交
1405 1406
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                 bool* exists) {
1407
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1408 1409
  int32_t        code = TSDB_CODE_SUCCESS;
  bool           asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1410

1411
  if (asc) {
H
Haojun Liao 已提交
1412
    // query ended in/started from current block
H
Hongze Cheng 已提交
1413
    if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
1414
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1415 1416
        *exists = false;
        return code;
1417
      }
1418

1419
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1420
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1421

H
Hongze Cheng 已提交
1422
      if (pCheckInfo->lastKey > pBlock->minKey.ts) {
1423
        cur->pos =
1424
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1425 1426 1427
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1428

H
Hongze Cheng 已提交
1429
      assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
1430
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1431
    } else {  // the whole block is loaded in to buffer
dengyihao's avatar
dengyihao 已提交
1432
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1433
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1434
    }
dengyihao's avatar
dengyihao 已提交
1435
  } else {  // desc order, query ended in current block
H
Hongze Cheng 已提交
1436
    if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
1437
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1438 1439
        *exists = false;
        return code;
1440
      }
H
Haojun Liao 已提交
1441

1442
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Hongze Cheng 已提交
1443
      if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
dengyihao's avatar
dengyihao 已提交
1444 1445
        cur->pos =
            binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1446
      } else {
H
Haojun Liao 已提交
1447
        cur->pos = pBlock->numOfRows - 1;
1448
      }
H
Haojun Liao 已提交
1449

H
Hongze Cheng 已提交
1450
      assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
1451
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1452
    } else {
dengyihao's avatar
dengyihao 已提交
1453
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1454
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1455
    }
1456
  }
1457

1458
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1459
  return code;
H
[td-32]  
hjxilinx 已提交
1460 1461
}

1462
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1463
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1464
  int    numOfRows;
1465 1466
  TSKEY* keyList;

1467
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1468

1469
  if (num <= 0) return -1;
1470 1471

  keyList = (TSKEY*)pValue;
1472 1473
  firstPos = 0;
  lastPos = num - 1;
1474

1475
  if (order == TSDB_ORDER_DESC) {
1476 1477 1478 1479 1480
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1481

H
Haojun Liao 已提交
1482 1483
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1484

1485 1486 1487 1488 1489 1490 1491 1492
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1493

1494 1495 1496 1497 1498
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1499

1500 1501 1502 1503 1504 1505 1506
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1507

H
Haojun Liao 已提交
1508 1509
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1510

1511 1512 1513 1514 1515 1516 1517 1518 1519
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1520

1521 1522 1523
  return midPos;
}

dengyihao's avatar
dengyihao 已提交
1524 1525
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end) {
1526
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1527
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1528

1529
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1530 1531 1532 1533 1534 1535
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1536 1537
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
  int32_t trueStart = ascScan ? start : end;
1538
  int32_t trueEnd = ascScan ? end : start;
1539 1540
  int32_t step = ascScan ? 1 : -1;

1541
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1542

dengyihao's avatar
dengyihao 已提交
1543
  // data in buffer has greater timestamp, copy data in file block
1544
  int32_t i = 0, j = 0;
dengyihao's avatar
dengyihao 已提交
1545
  while (i < requiredNumOfCols && j < pCols->numOfCols) {
1546
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1547 1548 1549 1550 1551 1552 1553

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1554
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1555
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
dengyihao's avatar
dengyihao 已提交
1556
        //        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1557
        int32_t rowIndex = numOfRows;
1558
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1559
          SCellVal sVal = {0};
C
Cary Xu 已提交
1560
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1561 1562
            TASSERT(0);
          }
1563

C
Cary Xu 已提交
1564
          if (sVal.valType == TD_VTYPE_NORM) {
1565
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1566 1567
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1568 1569 1570
          }
        }
      } else {  // handle the var-string
1571 1572
        int32_t rowIndex = numOfRows;

1573
        // todo refactor, only copy one-by-one
1574
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1575
          SCellVal sVal = {0};
dengyihao's avatar
dengyihao 已提交
1576
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
H
Haojun Liao 已提交
1577 1578
            TASSERT(0);
          }
1579

C
Cary Xu 已提交
1580
          if (sVal.valType == TD_VTYPE_NORM) {
1581
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1582 1583
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1584
          }
1585 1586
        }
      }
1587 1588 1589

      j++;
      i++;
H
Hongze Cheng 已提交
1590
    } else {  // pColInfo->info.colId < src->colId, it is a NULL data
1591
      colDataAppendNNULL(pColInfo, numOfRows, num);
1592
      i++;
1593 1594
    }
  }
1595

dengyihao's avatar
dengyihao 已提交
1596
  while (i < requiredNumOfCols) {  // the remain columns are all null data
1597
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1598
    colDataAppendNNULL(pColInfo, numOfRows, num);
1599
    i++;
1600
  }
H
Haojun Liao 已提交
1601

1602 1603
  pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
  pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
1604

1605
  return numOfRows + num;
1606 1607
}

C
Cary Xu 已提交
1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
/**
 * @brief  // TODO fix bug for reverse copy data problem
 *        Note: row1 always has high priority
 *
 * @param pTsdbReadHandle
 * @param capacity
 * @param curRow
 * @param row1
 * @param row2
 * @param numOfCols
 * @param uid
 * @param pSchema1
 * @param pSchema2
 * @param update
 * @param lastRowKey
 * @return int32_t The quantity of rows appended
 */
static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
                                  STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                                  bool update, TSKEY* lastRowKey) {
H
Haojun Liao 已提交
1628
#if 1
dengyihao's avatar
dengyihao 已提交
1629 1630 1631 1632 1633 1634 1635 1636 1637
  STSchema* pSchema;
  STSRow*   row;
  int16_t   colId;
  int16_t   offset;

  bool     isRow1DataRow = TD_IS_TP_ROW(row1);
  bool     isRow2DataRow;
  bool     isChosenRowDataRow;
  int32_t  chosen_itr;
H
Haojun Liao 已提交
1638
  SCellVal sVal = {0};
C
Cary Xu 已提交
1639 1640
  TSKEY    rowKey = TSKEY_INITIAL_VAL;
  int32_t  nResult = 0;
C
Cary Xu 已提交
1641
  int32_t  mergeOption = 0;  // 0 discard 1 overwrite 2 merge
1642

H
Haojun Liao 已提交
1643
  // the schema version info is embeded in STSRow
1644 1645 1646
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
1647
    pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
1648
  }
1649

C
Cary Xu 已提交
1650
#ifdef TD_DEBUG_PRINT_ROW
C
Cary Xu 已提交
1651 1652 1653 1654 1655
  char   flags[70] = {0};
  STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
  snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
           pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
  tdSRowPrint(row1, pSchema1, flags);
C
Cary Xu 已提交
1656 1657
#endif

dengyihao's avatar
dengyihao 已提交
1658
  if (isRow1DataRow) {
1659
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1660
  } else {
H
Haojun Liao 已提交
1661
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1662
  }
1663

1664
  int32_t numOfColsOfRow2 = 0;
dengyihao's avatar
dengyihao 已提交
1665
  if (row2) {
H
Haojun Liao 已提交
1666
    isRow2DataRow = TD_IS_TP_ROW(row2);
1667
    if (pSchema2 == NULL) {
1668
      pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
1669
    }
dengyihao's avatar
dengyihao 已提交
1670
    if (isRow2DataRow) {
1671 1672
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1673
      numOfColsOfRow2 = tdRowGetNCols(row2);
1674 1675
    }
  }
C
Cary Xu 已提交
1676

1677
  int32_t i = 0, j = 0, k = 0;
dengyihao's avatar
dengyihao 已提交
1678
  while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1679
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1680 1681

    int32_t colIdOfRow1;
dengyihao's avatar
dengyihao 已提交
1682
    if (j >= numOfColsOfRow1) {
1683
      colIdOfRow1 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1684
    } else if (isRow1DataRow) {
1685 1686
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
C
Cary Xu 已提交
1687
      colIdOfRow1 = tdKvRowColIdAt(row1, j);
1688 1689 1690
    }

    int32_t colIdOfRow2;
dengyihao's avatar
dengyihao 已提交
1691
    if (k >= numOfColsOfRow2) {
1692
      colIdOfRow2 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1693
    } else if (isRow2DataRow) {
1694 1695
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
C
Cary Xu 已提交
1696
      colIdOfRow2 = tdKvRowColIdAt(row2, k);
1697 1698
    }

C
Cary Xu 已提交
1699
    if (colIdOfRow1 < colIdOfRow2) {  // the most probability
dengyihao's avatar
dengyihao 已提交
1700
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1701
        ++j;
C
Cary Xu 已提交
1702 1703
        continue;
      }
1704 1705 1706 1707
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
C
Cary Xu 已提交
1708
    } else if (colIdOfRow1 == colIdOfRow2) {
dengyihao's avatar
dengyihao 已提交
1709
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1710 1711
        ++j;
        ++k;
1712
        continue;
C
Cary Xu 已提交
1713
      }
1714 1715 1716 1717 1718
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
dengyihao's avatar
dengyihao 已提交
1719
      if (colIdOfRow2 < pColInfo->info.colId) {
C
Cary Xu 已提交
1720
        ++k;
1721 1722 1723 1724 1725 1726 1727
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
C
Cary Xu 已提交
1728

dengyihao's avatar
dengyihao 已提交
1729
    if (isChosenRowDataRow) {
1730 1731
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
C
Cary Xu 已提交
1732 1733
      // TODO: use STSRowIter
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
C
Cary Xu 已提交
1734 1735 1736
      if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1737
          mergeOption = 1;
C
Cary Xu 已提交
1738 1739 1740
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1741
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1742
          ++nResult;
C
Cary Xu 已提交
1743
        } else if (update) {
C
Cary Xu 已提交
1744 1745 1746 1747
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1748 1749
        }
      }
1750
    } else {
C
Cary Xu 已提交
1751 1752 1753 1754
      // TODO: use STSRowIter
      if (chosen_itr == 0) {
        colId = PRIMARYKEY_TIMESTAMP_COL_ID;
        tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
C
Cary Xu 已提交
1755 1756
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1757
          mergeOption = 1;
C
Cary Xu 已提交
1758 1759 1760
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1761
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1762
          ++nResult;
C
Cary Xu 已提交
1763
        } else if (update) {
C
Cary Xu 已提交
1764 1765 1766 1767
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1768
        }
C
Cary Xu 已提交
1769 1770 1771 1772 1773 1774
      } else {
        SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
        colId = pColIdx->colId;
        offset = pColIdx->offset;
        tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
      }
1775 1776
    }

C
Cary Xu 已提交
1777 1778
    ASSERT(rowKey != TSKEY_INITIAL_VAL);

1779
    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1780
      if (tdValTypeIsNorm(sVal.valType)) {
C
Cary Xu 已提交
1781 1782 1783 1784 1785
        colDataAppend(pColInfo, *curRow, sVal.val, false);
      } else if (tdValTypeIsNull(sVal.valType)) {
        colDataAppend(pColInfo, *curRow, NULL, true);
      } else if (tdValTypeIsNone(sVal.valType)) {
        // TODO: Set null if nothing append for this row
C
Cary Xu 已提交
1786
        if (mergeOption == 1) {
C
Cary Xu 已提交
1787 1788 1789 1790
          colDataAppend(pColInfo, *curRow, NULL, true);
        }
      } else {
        ASSERT(0);
1791
      }
H
Haojun Liao 已提交
1792

C
Cary Xu 已提交
1793
      ++i;
C
Cary Xu 已提交
1794

dengyihao's avatar
dengyihao 已提交
1795
      if (row == row1) {
C
Cary Xu 已提交
1796
        ++j;
1797
      } else {
C
Cary Xu 已提交
1798
        ++k;
1799 1800
      }
    } else {
C
Cary Xu 已提交
1801
      if (mergeOption == 1) {
C
Cary Xu 已提交
1802
        colDataAppend(pColInfo, *curRow, NULL, true);
C
Cary Xu 已提交
1803
      }
C
Cary Xu 已提交
1804
      ++i;
1805
    }
1806
  }
1807

C
Cary Xu 已提交
1808
  if (mergeOption == 1) {
dengyihao's avatar
dengyihao 已提交
1809
    while (i < numOfCols) {  // the remain columns are all null data
1810
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
C
Cary Xu 已提交
1811 1812
      colDataAppend(pColInfo, *curRow, NULL, true);
      ++i;
1813 1814
    }
  }
C
Cary Xu 已提交
1815 1816

  return nResult;
H
Haojun Liao 已提交
1817
#endif
1818
}
1819

dengyihao's avatar
dengyihao 已提交
1820 1821
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
                                int32_t numOfExisted, int32_t* start, int32_t* end) {
1822 1823
  *start = -1;

1824
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1825
    int32_t remain = endPos - startPos + 1;
1826 1827
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1828 1829
    } else {
      *end = endPos;
1830 1831 1832 1833 1834
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1835 1836
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1837 1838
    } else {
      *end = endPos;
1839 1840 1841 1842 1843 1844 1845
    }

    *start = *end;
    *end = startPos;
  }
}

dengyihao's avatar
dengyihao 已提交
1846 1847
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
                                 int32_t endPos) {
1848
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1849 1850

  pCheckInfo->lastKey = cur->lastKey;
1851
  pTsdbReadHandle->realNumOfRows = numOfRows;
1852 1853 1854 1855
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1856 1857
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1858 1859

  if (cur->rows > 0) {
1860 1861
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1862
    } else {
1863
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1864 1865
    }

1866
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
dengyihao's avatar
dengyihao 已提交
1867 1868
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
           cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
H
Haojun Liao 已提交
1869
  } else {
1870
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1871

dengyihao's avatar
dengyihao 已提交
1872
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1873
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1874 1875 1876
  }
}

dengyihao's avatar
dengyihao 已提交
1877 1878
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                           SDataBlockInfo* pBlockInfo, int32_t endPos) {
1879
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1880

1881
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1882
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1883

1884
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Haojun Liao 已提交
1885

dengyihao's avatar
dengyihao 已提交
1886
  int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1887 1888 1889 1890

  int32_t start = cur->pos;
  int32_t end = endPos;

1891
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1892
    TSWAP(start, end);
H
Haojun Liao 已提交
1893 1894
  }

1895 1896
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1897 1898

  // the time window should always be ascending order: skey <= ekey
dengyihao's avatar
dengyihao 已提交
1899
  cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1900
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
dengyihao's avatar
dengyihao 已提交
1901 1902
  cur->lastKey = tsArray[endPos] + step;
  cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Haojun Liao 已提交
1903

H
Haojun Liao 已提交
1904
  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
1905
  int32_t pos = endPos + step;
1906 1907
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1908

dengyihao's avatar
dengyihao 已提交
1909 1910 1911
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1912 1913
}

1914
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1915 1916
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1917
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
dengyihao's avatar
dengyihao 已提交
1918
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1919

1920
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1921
  SDataCols*     pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1922

1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934
  if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
      endPos = pBlockInfo->rows - 1;
      cur->mixBlock = (cur->pos != 0);
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
      endPos = 0;
      cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
    } else {
      assert(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
      cur->mixBlock = true;
    }
H
Haojun Liao 已提交
1935
  } else {
1936
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
1937
      endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
1938
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
1939
      endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954
    } else {
      ASSERT(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);

      // current data is more than the capacity
      int32_t size = abs(cur->pos - endPos) + 1;
      if (size > pTsdbReadHandle->outputCapacity) {
        int32_t delta = size - pTsdbReadHandle->outputCapacity;
        if (ascScan) {
          endPos -= delta;
        } else {
          endPos += delta;
        }
      }
    }
H
Haojun Liao 已提交
1955 1956 1957 1958 1959 1960
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1961 1962
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1963 1964
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1965
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
H
Hongze Cheng 已提交
1966
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1967

1968
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1969

1970 1971
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
dengyihao's avatar
dengyihao 已提交
1972
         cur->pos >= 0 && cur->pos < pBlock->numOfRows);
1973
  // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
1974
  TSKEY* tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
1975 1976
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->minKey.ts &&
         tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
1977

dengyihao's avatar
dengyihao 已提交
1978
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1979 1980
  int32_t step = ascScan ? 1 : -1;

1981
  // for search the endPos, so the order needs to reverse
1982
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
1983

1984 1985
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1986

1987
  STimeWindow* pWin = &blockInfo.window;
H
Hongze Cheng 已提交
1988
  tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
dengyihao's avatar
dengyihao 已提交
1989 1990 1991
            " rows:%d, start:%d, end:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1992

1993 1994
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
C
Cary Xu 已提交
1995
  int32_t curRow = 0;
H
Haojun Liao 已提交
1996

dengyihao's avatar
dengyihao 已提交
1997 1998
  int16_t   rv1 = -1;
  int16_t   rv2 = -1;
1999 2000
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
2001

H
Haojun Liao 已提交
2002 2003
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
2004
  bool adjustPos = false;
2005

2006 2007
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
2008
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
2009
    return;
2010
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
2011
    SSkipListNode* node = NULL;
C
Cary Xu 已提交
2012
    TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;
C
Cary Xu 已提交
2013

2014
    do {
H
Haojun Liao 已提交
2015
      STSRow* row2 = NULL;
C
Cary Xu 已提交
2016
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
2017
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
2018
        break;
2019
      }
2020

H
Haojun Liao 已提交
2021
      TSKEY key = TD_ROW_KEY(row1);
2022
      if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
2023 2024 2025
        break;
      }

2026 2027 2028 2029 2030 2031 2032
      if (adjustPos) {
        if (key == lastKeyAppend) {
          pos -= step;
        }
        adjustPos = false;
      }

2033 2034
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
2035 2036 2037
        break;
      }

2038
      if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
H
Haojun Liao 已提交
2039
        if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2040
          //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2041
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
2042
        }
dengyihao's avatar
dengyihao 已提交
2043 2044
        if (row2 && rv2 != TD_ROW_SVER(row2)) {
          //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2045
          rv2 = TD_ROW_SVER(row2);
2046
        }
dengyihao's avatar
dengyihao 已提交
2047

C
Cary Xu 已提交
2048 2049 2050
        numOfRows +=
            mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                               pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
2051 2052 2053
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
2054

2055
        cur->win.ekey = key;
dengyihao's avatar
dengyihao 已提交
2056
        cur->lastKey = key + step;
2057
        cur->mixBlock = true;
2058
        moveToNextRowInMem(pCheckInfo);
2059
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
C
Cary Xu 已提交
2060
#if 0
H
TD-1439  
Hongze Cheng 已提交
2061
        if (pCfg->update) {
dengyihao's avatar
dengyihao 已提交
2062
          if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
2063
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
2064
          }
H
Haojun Liao 已提交
2065
          if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2066
            //            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2067
            rv1 = TD_ROW_SVER(row1);
2068
          }
dengyihao's avatar
dengyihao 已提交
2069 2070
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            //            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2071
            rv2 = TD_ROW_SVER(row2);
2072
          }
dengyihao's avatar
dengyihao 已提交
2073

2074
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
dengyihao's avatar
dengyihao 已提交
2075
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
C
Cary Xu 已提交
2076
                             pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull, &lastRowKey);
H
TD-1439  
Hongze Cheng 已提交
2077 2078 2079 2080 2081 2082 2083 2084 2085
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

C
Cary Xu 已提交
2086 2087 2088 2089 2090 2091 2092
          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
#endif
        if (TD_SUPPORT_UPDATE(pCfg->update)) {
2093 2094 2095 2096
          if (lastKeyAppend != key) {
            if (lastKeyAppend != TSKEY_INITIAL_VAL) {
              ++curRow;
            }
2097
            lastKeyAppend = key;
2098 2099
          }
          // load data from file firstly
C
Cary Xu 已提交
2100
          numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
C
Cary Xu 已提交
2101 2102 2103 2104 2105 2106 2107

          if (rv1 != TD_ROW_SVER(row1)) {
            rv1 = TD_ROW_SVER(row1);
          }
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            rv2 = TD_ROW_SVER(row2);
          }
2108 2109

          // still assign data into current row
dengyihao's avatar
dengyihao 已提交
2110 2111 2112
          numOfRows +=
              mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                                 pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
C
Cary Xu 已提交
2113 2114 2115 2116 2117 2118 2119 2120 2121

          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

H
TD-1439  
Hongze Cheng 已提交
2122
          moveToNextRowInMem(pCheckInfo);
2123

H
TD-1439  
Hongze Cheng 已提交
2124
          pos += step;
2125
          adjustPos = true;
H
TD-1439  
Hongze Cheng 已提交
2126
        } else {
2127
          // discard the memory record
H
TD-1439  
Hongze Cheng 已提交
2128 2129
          moveToNextRowInMem(pCheckInfo);
        }
2130
      } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
2131 2132 2133
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
2134

2135
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
2136 2137
        assert(end != -1);

dengyihao's avatar
dengyihao 已提交
2138
        if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
C
Cary Xu 已提交
2139
#if 0
2140
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
2141 2142 2143 2144
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
C
Cary Xu 已提交
2145 2146 2147 2148 2149 2150
#endif
          if (!TD_SUPPORT_UPDATE(pCfg->update)) {
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
2151
        }
2152

2153
        int32_t qstart = 0, qend = 0;
2154
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
2155

2156
        if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
C
Cary Xu 已提交
2157 2158
          ++curRow;
        }
2159

C
Cary Xu 已提交
2160
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
2161
        pos += (qend - qstart + 1) * step;
C
Cary Xu 已提交
2162 2163 2164
        if (numOfRows > 0) {
          curRow = numOfRows - 1;
        }
2165

2166
        cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
dengyihao's avatar
dengyihao 已提交
2167
        cur->lastKey = cur->win.ekey + step;
C
Cary Xu 已提交
2168
        lastKeyAppend = cur->win.ekey;
2169
      }
2170
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
2171

2172
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
2173 2174 2175 2176
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
dengyihao's avatar
dengyihao 已提交
2177
      if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
2178
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
2179 2180 2181 2182 2183
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

2184
        int32_t start = -1, end = -1;
2185
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
2186

2187
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
2188
        pos += (end - start + 1) * step;
2189

2190
        cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
dengyihao's avatar
dengyihao 已提交
2191
        cur->lastKey = cur->win.ekey + step;
H
Haojun Liao 已提交
2192
        cur->mixBlock = true;
2193
      }
2194 2195
    }
  }
H
Haojun Liao 已提交
2196

2197
  cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
dengyihao's avatar
dengyihao 已提交
2198
                         ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
2199

2200
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
2201
    TSWAP(cur->win.skey, cur->win.ekey);
2202
  }
2203

2204 2205
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2206

dengyihao's avatar
dengyihao 已提交
2207 2208 2209
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
2210 2211
}

2212
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2213
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2214
  int    numOfRows;
2215 2216
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2217
  if (num <= 0) return -1;
2218 2219

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2220 2221
  firstPos = 0;
  lastPos = num - 1;
2222

2223
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2224 2225 2226 2227 2228
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2229

H
Haojun Liao 已提交
2230 2231
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2232

H
[td-32]  
hjxilinx 已提交
2233 2234 2235 2236 2237 2238 2239 2240
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2241

H
[td-32]  
hjxilinx 已提交
2242 2243 2244 2245 2246
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2247

H
[td-32]  
hjxilinx 已提交
2248 2249 2250 2251 2252 2253 2254
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2255

H
Haojun Liao 已提交
2256 2257
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2258

H
[td-32]  
hjxilinx 已提交
2259 2260 2261 2262 2263 2264 2265 2266 2267
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2268

H
[td-32]  
hjxilinx 已提交
2269 2270 2271
  return midPos;
}

2272
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
2273 2274
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
2275 2276

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2277
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
2278
    taosMemoryFreeClear(pBlockInfo);
2279 2280
  }

wafwerar's avatar
wafwerar 已提交
2281
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2293
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2294 2295
    /* left block is empty */
    return 1;
2296
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2297 2298 2299 2300 2301 2302 2303
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2304
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
dengyihao's avatar
dengyihao 已提交
2305
#if 0  // TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2306 2307
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2308
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2309
  }
H
Haojun Liao 已提交
2310
#endif
2311

H
Haojun Liao 已提交
2312
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2313 2314
}

2315
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2316 2317
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2318 2319
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2320
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2321 2322 2323 2324
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

dengyihao's avatar
dengyihao 已提交
2325
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
2326 2327
  }

2328
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2329 2330
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2331
  // access data blocks according to the offset of each block in asc/desc order.
2332
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2333

2334 2335
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2336 2337 2338
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2339

2340
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2341
    cleanBlockOrderSupporter(&sup, 0);
2342
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2343
  }
H
Haojun Liao 已提交
2344

2345
  int32_t cnt = 0;
2346
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2347

2348
  for (int32_t j = 0; j < numOfTables; ++j) {
2349
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2350 2351 2352
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2353

H
refact  
Hongze Cheng 已提交
2354
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2355
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2356

wafwerar's avatar
wafwerar 已提交
2357
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2358
    if (buf == NULL) {
2359
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2360
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2361 2362
    }

2363
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2364 2365

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2366
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2367

H
Haojun Liao 已提交
2368 2369
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2370 2371 2372
      cnt++;
    }

2373
    numOfQualTables++;
2374 2375
  }

H
Haojun Liao 已提交
2376
  assert(numOfBlocks == cnt);
2377

H
Haojun Liao 已提交
2378 2379
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2380
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2381
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2382

H
Haojun Liao 已提交
2383
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2384
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2385 2386
    return TSDB_CODE_SUCCESS;
  }
2387

H
Haojun Liao 已提交
2388
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2389
            numOfQualTables, pTsdbReadHandle->idStr);
2390

2391
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2392
  sup.numOfTables = numOfQualTables;
2393

2394
  SMultiwayMergeTreeInfo* pTree = NULL;
dengyihao's avatar
dengyihao 已提交
2395
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2396 2397
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2398
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2399 2400 2401 2402 2403
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2404
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2405 2406
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2407
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2408
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2409 2410

    // set data block index overflow, in order to disable the offset comparator
2411 2412
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2413
    }
2414

H
Haojun Liao 已提交
2415
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2416 2417 2418 2419 2420
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2421
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2422 2423 2424
   * }
   */

H
Haojun Liao 已提交
2425
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2426
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2427
  taosMemoryFree(pTree);
2428 2429 2430 2431

  return TSDB_CODE_SUCCESS;
}

2432
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2433

2434
static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
dengyihao's avatar
dengyihao 已提交
2435
  int32_t        step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2436
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2437

dengyihao's avatar
dengyihao 已提交
2438
  while (1) {
2439
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2440 2441 2442 2443
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2444 2445
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2446
      // all data blocks in current file has been checked already, try next file if exists
2447
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2448 2449 2450 2451
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2452
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2453 2454 2455 2456
    }
  }
}

2457 2458 2459
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2460 2461 2462

  int32_t code = TSDB_CODE_SUCCESS;

2463
  int32_t numOfBlocks = 0;
2464
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2465

C
Cary Xu 已提交
2466
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
C
Cary Xu 已提交
2467
  STimeWindow   win = TSWINDOW_INITIALIZER;
2468

H
Hongze Cheng 已提交
2469
  while (true) {
2470
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2471

2472 2473
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2474 2475 2476
      break;
    }

H
refact  
Hongze Cheng 已提交
2477
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2478 2479

    // current file are not overlapped with query time window, ignore remain files
2480 2481 2482
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2483 2484
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2485 2486
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2487 2488 2489
      break;
    }

2490 2491
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2492 2493 2494 2495
      code = terrno;
      break;
    }

2496
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2497

2498
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2499 2500 2501 2502
      code = terrno;
      break;
    }

2503
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2504 2505
      break;
    }
H
Haojun Liao 已提交
2506

H
Haojun Liao 已提交
2507 2508
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2509

2510 2511 2512 2513
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2514

2515
    // todo return error code to query engine
dengyihao's avatar
dengyihao 已提交
2516 2517
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
        TSDB_CODE_SUCCESS) {
2518 2519
      break;
    }
H
Haojun Liao 已提交
2520

2521 2522
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2523 2524 2525
      break;
    }
  }
H
Haojun Liao 已提交
2526

2527
  // no data in file anymore
2528
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2529
    if (code == TSDB_CODE_SUCCESS) {
2530
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2531 2532
    }

D
dapan1121 已提交
2533
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2534 2535
    *exists = false;
    return code;
2536
  }
H
Haojun Liao 已提交
2537

2538
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
dengyihao's avatar
dengyihao 已提交
2539
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
2540
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2541

2542
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2543
  return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2544 2545 2546 2547 2548 2549 2550
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2551
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2552
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
2553

2554 2555
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2556 2557

  cur->slot += step;
dengyihao's avatar
dengyihao 已提交
2558
  cur->mixBlock = false;
H
Haojun Liao 已提交
2559
  cur->blockCompleted = false;
2560
}
H
Haojun Liao 已提交
2561

2562 2563 2564 2565
static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
  return (numOfRows - startRow) / bucketRange;
}

H
Haojun Liao 已提交
2566
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
dengyihao's avatar
dengyihao 已提交
2567
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
H
Haojun Liao 已提交
2568

H
Haojun Liao 已提交
2569
  pTableBlockInfo->totalSize = 0;
2570
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2571

2572
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2573 2574

  // find the start data block in file
2575
  pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2576 2577
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
  int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2578 2579

  tsdbRLockFS(pFileHandle);
2580 2581
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2582 2583
  tsdbUnLockFS(pFileHandle);

2584 2585 2586 2587 2588 2589
  STsdbCfg* pc = REPO_CFG(pTsdbReadHandle->pTsdb);
  pTableBlockInfo->defMinRows = pc->minRows;
  pTableBlockInfo->defMaxRows = pc->maxRows;

  int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0);

H
Haojun Liao 已提交
2590
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2591

H
Haojun Liao 已提交
2592
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2593
  int32_t     numOfBlocks = 0;
2594
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2595
  int         defaultRows = 4096;
H
Haojun Liao 已提交
2596 2597 2598 2599
  STimeWindow win = TSWINDOW_INITIALIZER;

  while (true) {
    numOfBlocks = 0;
2600
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2601

2602 2603
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2604 2605 2606
      break;
    }

H
refact  
Hongze Cheng 已提交
2607
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2608 2609

    // current file are not overlapped with query time window, ignore remain files
2610
    if ((win.skey > pTsdbReadHandle->window.ekey)/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
2611
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2612 2613
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2614
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2615 2616 2617
      break;
    }

H
Haojun Liao 已提交
2618
    pTableBlockInfo->numOfFiles += 1;
2619 2620
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2621 2622 2623 2624
      code = terrno;
      break;
    }

2625
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2626

2627
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2628 2629 2630 2631
      code = terrno;
      break;
    }

2632
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2633 2634 2635
      break;
    }

H
Haojun Liao 已提交
2636 2637
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2638 2639 2640 2641 2642

    if (numOfBlocks == 0) {
      continue;
    }

2643 2644
    pTableBlockInfo->numOfBlocks += numOfBlocks;

H
Haojun Liao 已提交
2645
    for (int32_t i = 0; i < numOfTables; ++i) {
2646
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2647 2648

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
2649

H
Haojun Liao 已提交
2650
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2651
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2652

H
Haojun Liao 已提交
2653
        int32_t numOfRows = pBlock[j].numOfRows;
2654
        pTableBlockInfo->totalRows += numOfRows;
2655

H
Haojun Liao 已提交
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
2667 2668 2669

        int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
        pTableBlockInfo->blockRowsHisto[bucketIndex]++;
H
Haojun Liao 已提交
2670 2671 2672 2673
      }
    }
  }

2674
  pTableBlockInfo->numOfTables = numOfTables;
H
Haojun Liao 已提交
2675 2676 2677
  return code;
}

2678 2679 2680
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2681 2682

  // find the start data block in file
2683 2684
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2685 2686
    STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
    int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2687

H
Hongze Cheng 已提交
2688
    tsdbRLockFS(pFileHandle);
2689 2690
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2691
    tsdbUnLockFS(pFileHandle);
2692

2693
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2694
  } else {
2695
    // check if current file block is all consumed
2696
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2697
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2698

2699
    // current block is done, try next
H
Haojun Liao 已提交
2700
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2701
      // all data blocks in current file has been checked already, try next file if exists
2702
    } else {
H
Haojun Liao 已提交
2703 2704
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2705 2706
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2707

H
Haojun Liao 已提交
2708 2709 2710 2711 2712 2713 2714
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2715 2716
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2717
    } else {
2718 2719
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2720
      return getDataBlock(pTsdbReadHandle, pNext, exists);
2721 2722
    }
  }
2723 2724
}

2725 2726
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2727

2728 2729
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2730 2731
      return true;
    }
H
Haojun Liao 已提交
2732

2733
    pTsdbReadHandle->activeIndex += 1;
2734
  }
H
Haojun Liao 已提交
2735

2736 2737 2738
  return false;
}

dengyihao's avatar
dengyihao 已提交
2739
// todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2740
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2741
  // filter the queried time stamp in the first place
dengyihao's avatar
dengyihao 已提交
2742
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
2743 2744

  // starts from the buffer in case of descending timestamp order check data blocks
2745
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2746 2747

  int32_t i = 0;
dengyihao's avatar
dengyihao 已提交
2748
  while (i < numOfTables) {
2749
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2750 2751

    // the first qualified table for interpolation query
dengyihao's avatar
dengyihao 已提交
2752 2753 2754 2755
    //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
    //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
    //      break;
    //    }
H
Haojun Liao 已提交
2756 2757 2758 2759 2760 2761 2762 2763 2764

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2765
  STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
2766
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2767

2768 2769
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2770 2771 2772
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2773
                                 STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2774
  int       numOfRows = 0;
C
Cary Xu 已提交
2775
  int       curRows = 0;
dengyihao's avatar
dengyihao 已提交
2776
  int32_t   numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Hongze Cheng 已提交
2777
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2778 2779
  win->skey = TSKEY_INITIAL_VAL;

dengyihao's avatar
dengyihao 已提交
2780 2781
  int64_t   st = taosGetTimestampUs();
  int16_t   rv = -1;
D
fix bug  
dapan1121 已提交
2782
  STSchema* pSchema = NULL;
C
Cary Xu 已提交
2783 2784
  TSKEY     lastRowKey = TSKEY_INITIAL_VAL;

H
Haojun Liao 已提交
2785
  do {
C
Cary Xu 已提交
2786
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
2787 2788 2789 2790
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2791
    TSKEY key = TD_ROW_KEY(row);
dengyihao's avatar
dengyihao 已提交
2792 2793 2794 2795
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
                key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2796 2797 2798 2799 2800 2801 2802 2803 2804

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2805
    if (rv != TD_ROW_SVER(row)) {
2806
      pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, TD_ROW_SVER(row));
H
Haojun Liao 已提交
2807
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2808
    }
C
Cary Xu 已提交
2809 2810
    numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
                                    pSchema, NULL, pCfg->update, &lastRowKey);
H
Haojun Liao 已提交
2811

C
Cary Xu 已提交
2812
    if (numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
2813 2814 2815 2816
      moveToNextRowInMem(pCheckInfo);
      break;
    }

dengyihao's avatar
dengyihao 已提交
2817
  } while (moveToNextRowInMem(pCheckInfo));
H
Haojun Liao 已提交
2818

C
Cary Xu 已提交
2819
  taosMemoryFreeClear(pSchema);  // free the STSChema
H
Haojun Liao 已提交
2820 2821 2822
  assert(numOfRows <= maxRowsToRead);

  int64_t elapsedTime = taosGetTimestampUs() - st;
dengyihao's avatar
dengyihao 已提交
2823 2824
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
            pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2825 2826 2827 2828

  return numOfRows;
}

dengyihao's avatar
dengyihao 已提交
2829 2830 2831 2832 2833 2834
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2835 2836 2837 2838 2839 2840
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
wmmhello's avatar
wmmhello 已提交
2841
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
2842
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2843

2844 2845 2846 2847 2848
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2849

2850
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2851 2852 2853
    taosArrayPush(list, &info);
  }

C
Cary Xu 已提交
2854
  metaCloseCtbCursor(pCur);
H
Haojun Liao 已提交
2855 2856 2857
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873
int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);

  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseCtbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2874 2875 2876 2877 2878
static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2879 2880 2881 2882 2883 2884
  //  tQueryInfo* pInfo = (tQueryInfo*)param;
  //  if (pInfo->optr != TSDB_RELATION_IN) {
  //    taosMemoryFreeClear(pInfo->q);
  //  } else {
  //    taosHashCleanup((SHashObj *)(pInfo->q));
  //  }
H
Haojun Liao 已提交
2885

wafwerar's avatar
wafwerar 已提交
2886
  taosMemoryFree(param);
H
Haojun Liao 已提交
2887 2888
}

dengyihao's avatar
dengyihao 已提交
2889 2890
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
2891

dengyihao's avatar
dengyihao 已提交
2892
static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
2893
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2894 2895
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2896

2897
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2898
    if (code != TSDB_CODE_SUCCESS) {
2899
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2900 2901
      return false;
    }
H
Haojun Liao 已提交
2902

H
Haojun Liao 已提交
2903
    if (exists) {
dengyihao's avatar
dengyihao 已提交
2904
      tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
2905 2906 2907
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2908 2909
      }

dengyihao's avatar
dengyihao 已提交
2910
      pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2911 2912
      return exists;
    }
H
Haojun Liao 已提交
2913

2914
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2915
  }
H
Haojun Liao 已提交
2916

2917 2918
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2919 2920
    return true;
  }
H
Haojun Liao 已提交
2921

H
Haojun Liao 已提交
2922
  // current result is empty
dengyihao's avatar
dengyihao 已提交
2923 2924
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
      pTsdbReadHandle->cur.rows == 0) {
H
refact  
Hongze Cheng 已提交
2925
    //    SMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2926

dengyihao's avatar
dengyihao 已提交
2927 2928
    //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
    //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2929

2930
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2931

dengyihao's avatar
dengyihao 已提交
2932 2933
    //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
    //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
2934
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2935 2936

    return result;
2937
  }
H
Haojun Liao 已提交
2938

H
Haojun Liao 已提交
2939 2940
  return false;
}
2941

2942
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2943
  // the last row is cached in buffer, return it directly.
2944
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
dengyihao's avatar
dengyihao 已提交
2945
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
2946
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2947 2948
  assert(numOfTables > 0 && numOfCols > 0);

2949
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2950

dengyihao's avatar
dengyihao 已提交
2951 2952 2953
  STSRow* pRow = NULL;
  TSKEY   key = TSKEY_INITIAL_VAL;
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
C
Cary Xu 已提交
2954 2955
  TSKEY   lastRowKey = TSKEY_INITIAL_VAL;
  int32_t curRow = 0;
2956 2957 2958

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
dengyihao's avatar
dengyihao 已提交
2959 2960 2961 2962
    //    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
    //    if (ret != TSDB_CODE_SUCCESS) {
    //      return false;
    //    }
C
Cary Xu 已提交
2963 2964
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
                       pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
wafwerar's avatar
wafwerar 已提交
2965
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2966

H
Haojun Liao 已提交
2967 2968 2969
    // update the last key value
    pCheckInfo->lastKey = key + step;

dengyihao's avatar
dengyihao 已提交
2970 2971
    cur->rows = 1;  // only one row
    cur->lastKey = key + step;
H
Haojun Liao 已提交
2972 2973 2974 2975 2976
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2977
  }
H
Haojun Liao 已提交
2978

H
Haojun Liao 已提交
2979 2980 2981
  return false;
}

dengyihao's avatar
dengyihao 已提交
2982
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
dengyihao's avatar
dengyihao 已提交
3002 3003
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
//      pTable->tableId); continue;
3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
3134 3135 3136
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
3137

dengyihao's avatar
dengyihao 已提交
3138
  while (pTsdbReadHandle->activeIndex < numOfTables) {
3139
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
3140 3141 3142
      return true;
    }

3143
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
3144 3145
    pCheckInfo->numOfBlocks = 0;

3146 3147
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
3148 3149
    pTsdbReadHandle->checkFiles = true;
    pTsdbReadHandle->cur.rows = 0;
3150
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
3151 3152 3153 3154

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
3155
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3156 3157 3158
  }

  return false;
3159 3160
}

H
Haojun Liao 已提交
3161
// handle data in cache situation
H
Haojun Liao 已提交
3162
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3163
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
Y
yihaoDeng 已提交
3164

3165 3166
  size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns);
  for (int32_t i = 0; i < numOfCols; ++i) {
3167 3168 3169 3170
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

3171
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3172 3173
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
              pTsdbReadHandle->idStr);
3174 3175 3176
    return false;
  }

Y
yihaoDeng 已提交
3177 3178 3179
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

3180
  // TODO refactor: remove "type"
3181 3182
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
dengyihao's avatar
dengyihao 已提交
3183
      //      return loadCachedLastRow(pTsdbReadHandle);
3184
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
dengyihao's avatar
dengyihao 已提交
3185
      //      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
3186
    }
H
Haojun Liao 已提交
3187
  }
Y
yihaoDeng 已提交
3188

3189 3190
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
dengyihao's avatar
dengyihao 已提交
3191
  } else {  // loadType == RR and Offset Order
3192
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
3193 3194 3195
      // check if the query range overlaps with the file data block
      bool exists = true;

3196
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
3197
      if (code != TSDB_CODE_SUCCESS) {
3198 3199
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
3200 3201 3202 3203 3204

        return false;
      }

      if (exists) {
3205
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
3206 3207
        return exists;
      }
Y
yihaoDeng 已提交
3208

3209 3210
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
3211 3212
    }

H
Haojun Liao 已提交
3213
    // TODO: opt by consider the scan order
3214
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
3215
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
3216

H
Haojun Liao 已提交
3217
    elapsedTime = taosGetTimestampUs() - stime;
3218
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3219
    return ret;
Y
yihaoDeng 已提交
3220 3221
  }
}
3222

H
refact  
Hongze Cheng 已提交
3223
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) {
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
3258
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
3259 3260 3261 3262 3263 3264 3265 3266 3267
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
3268
//  SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
3269 3270 3271 3272 3273 3274 3275 3276
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
3277
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
3278 3279 3280 3281 3282 3283 3284 3285 3286 3287
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3288
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
3289
//  taosMemoryFreeClear(cond.colList);
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
dengyihao's avatar
dengyihao 已提交
3328
// out_of_memory:
3329
//  tsdbCleanupReadHandle(pSecQueryHandle);
3330 3331 3332
//  return terrno;
//}

H
Haojun Liao 已提交
3333
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3334 3335
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
  SQueryFilePos*   cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3336

H
Haojun Liao 已提交
3337 3338
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3339
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3340 3341
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3342 3343
  }

dengyihao's avatar
dengyihao 已提交
3344
  int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3345
  for (int32_t i = 0; i < numOfCols; ++i) {
3346 3347
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3348 3349 3350

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3351
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3352
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3353 3354

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3355
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3356
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3357 3358 3359
    }
  }

H
Haojun Liao 已提交
3360 3361
  cur->rows = 2;
  return true;
3362 3363
}

3364
/*
3365
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3366
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3367
 */
H
Haojun Liao 已提交
3368
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3385
// out:
3386 3387 3388 3389
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3390
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
dengyihao's avatar
dengyihao 已提交
3391
  return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3392 3393
}

wmmhello's avatar
wmmhello 已提交
3394 3395
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) {
  assert(pTsdbReadHandle != NULL && tableList != NULL);
3396

dengyihao's avatar
dengyihao 已提交
3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420
  //  TSKEY    key = TSKEY_INITIAL_VAL;
  //
  //  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
  //  assert(group != NULL);
  //
  //  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
  //
  //  int32_t code = 0;
  //
  //  if (((STable*)pInfo->pTable)->lastRow) {
  //    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
  //    if (code != TSDB_CODE_SUCCESS) {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
  //    } else {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
  //    }
  //  }
  //
  //  // update the tsdb query time range
  //  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
  //    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
  //    pTsdbReadHandle->checkFiles  = false;
  //    pTsdbReadHandle->activeIndex = -1;  // start from -1
  //  }
H
Haojun Liao 已提交
3421

3422
  return TSDB_CODE_SUCCESS;
3423 3424
}

3425 3426
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3427 3428

  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
3429 3430 3431
  //  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
  //    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
  //  }
D
update  
dapan1121 已提交
3432 3433

  // update the tsdb query time range
3434
  if (pTsdbReadHandle->cachelastrow) {
dengyihao's avatar
dengyihao 已提交
3435
    pTsdbReadHandle->checkFiles = false;
3436
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3437 3438 3439 3440 3441
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3442
STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
H
Haojun Liao 已提交
3443
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3444

dengyihao's avatar
dengyihao 已提交
3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503
  //  int32_t totalNumOfTable = 0;
  //  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
  //
  //  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  //  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  //  for (int32_t j = 0; j < numOfGroups; ++j) {
  //    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
  //    TSKEY   key = TSKEY_INITIAL_VAL;
  //
  //    STableKeyInfo keyInfo = {0};
  //
  //    size_t numOfTables = taosArrayGetSize(pGroup);
  //    for (int32_t i = 0; i < numOfTables; ++i) {
  //      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
  //
  //      // if the lastKey equals to INT64_MIN, there is no data in this table
  //      TSKEY lastKey = 0;  //((STable*)(pInfo->pTable))->lastKey;
  //      if (key < lastKey) {
  //        key = lastKey;
  //
  //        //        keyInfo.pTable  = pInfo->pTable;
  //        keyInfo.lastKey = key;
  //        pInfo->lastKey = key;
  //
  //        if (key < window.skey) {
  //          window.skey = key;
  //        }
  //
  //        if (key > window.ekey) {
  //          window.ekey = key;
  //        }
  //      }
  //    }
  //
  //    // more than one table in each group, only one table left for each group
  //    //    if (keyInfo.pTable != NULL) {
  //    //      totalNumOfTable++;
  //    //      if (taosArrayGetSize(pGroup) == 1) {
  //    //        // do nothing
  //    //      } else {
  //    //        taosArrayClear(pGroup);
  //    //        taosArrayPush(pGroup, &keyInfo);
  //    //      }
  //    //    } else {  // mark all the empty groups, and remove it later
  //    //      taosArrayDestroy(pGroup);
  //    //      taosArrayPush(emptyGroup, &j);
  //    //    }
  //  }
  //
  //  // window does not being updated, so set the original
  //  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
  //    window = TSWINDOW_INITIALIZER;
  //    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
  //  }
  //
  //  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
  //  taosArrayDestroy(emptyGroup);
  //
  //  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3504
  return window;
H
hjxilinx 已提交
3505 3506
}

H
Haojun Liao 已提交
3507
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3508
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
dengyihao's avatar
dengyihao 已提交
3509
  SQueryFilePos*   cur = &pHandle->cur;
3510 3511

  uint64_t uid = 0;
H
Haojun Liao 已提交
3512

3513
  // there are data in file
D
dapan1121 已提交
3514
  if (pHandle->cur.fid != INT32_MIN) {
3515
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3516
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3517
  } else {
3518
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3519
    uid = pCheckInfo->tableId;
3520
  }
3521

dengyihao's avatar
dengyihao 已提交
3522 3523
  tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
            cur->win.skey, cur->win.ekey, pHandle->idStr);
3524

3525
  pDataBlockInfo->uid = uid;
3526 3527 3528 3529 3530 3531

#if 0
  // for multi-group data query processing test purpose
  pDataBlockInfo->groupId = uid;
#endif

dengyihao's avatar
dengyihao 已提交
3532
  pDataBlockInfo->rows = cur->rows;
H
Haojun Liao 已提交
3533
  pDataBlockInfo->window = cur->win;
3534
}
H
hjxilinx 已提交
3535

H
Haojun Liao 已提交
3536 3537 3538
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3539
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
dengyihao's avatar
dengyihao 已提交
3540
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3541
  *allHave = false;
H
Haojun Liao 已提交
3542

H
Haojun Liao 已提交
3543 3544
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3545 3546 3547
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3548

H
Haojun Liao 已提交
3549 3550 3551 3552
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3553 3554 3555 3556
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3557 3558

  int64_t stime = taosGetTimestampUs();
3559 3560
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3561
    return terrno;
3562 3563 3564
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3565
  }
H
Haojun Liao 已提交
3566

S
Shengliang Guan 已提交
3567
  tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
C
Cary Xu 已提交
3568 3569
            TSDB_READ_TABLE_UID(&pHandle->rhelper));

3570
  int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
3571

H
Haojun Liao 已提交
3572
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
3573 3574 3575
  memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

dengyihao's avatar
dengyihao 已提交
3576
  for (int32_t i = 0; i < numOfCols; ++i) {
3577
    pHandle->suppInfo.pstatis[i].colId = colIds[i];
3578
  }
H
Haojun Liao 已提交
3579

3580 3581
  *allHave = true;
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3582 3583

  // always load the first primary timestamp column data
3584
  SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
3585
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3586 3587

  pPrimaryColStatis->numOfNull = 0;
H
Hongze Cheng 已提交
3588 3589
  pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
  pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
3590
  pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
H
Haojun Liao 已提交
3591

dengyihao's avatar
dengyihao 已提交
3592
  // update the number of NULL data rows
3593
  int32_t* slotIds = pHandle->suppInfo.slotIds;
dengyihao's avatar
dengyihao 已提交
3594
  for (int32_t i = 1; i < numOfCols; ++i) {
3595
    ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
C
Cary Xu 已提交
3596
    if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) {
3597 3598 3599
      if (pHandle->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
        pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
      }
3600 3601

      pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
3602 3603
    } else {
      *allHave = false;
H
Haojun Liao 已提交
3604 3605
    }
  }
H
Haojun Liao 已提交
3606 3607 3608 3609

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

3610
  *pBlockStatis = pHandle->suppInfo.plist;
3611
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3612 3613
}

H
Haojun Liao 已提交
3614
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3615
  /**
H
hjxilinx 已提交
3616
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3617 3618
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3619
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3620
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3621 3622
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3623 3624
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3625

3626
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3627 3628
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3629
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3630
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3631

H
hjxilinx 已提交
3632 3633
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3634

H
Hongze Cheng 已提交
3635
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3636
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3637
        return pHandle->pColumns;
H
Haojun Liao 已提交
3638
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3639
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3640
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3641 3642
          return NULL;
        }
H
Haojun Liao 已提交
3643

H
Haojun Liao 已提交
3644
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
hjxilinx 已提交
3645 3646
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3647 3648
    }
  }
H
hjxilinx 已提交
3649
}
3650

H
Haojun Liao 已提交
3651
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3652
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3653
    return -1;
3654
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3655 3656 3657 3658 3659 3660 3661
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

3662 3663 3664 3665 3666 3667 3668 3669
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3670
    taosMemoryFreeClear(pColInfo->pData);
3671 3672 3673 3674 3675 3676
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3677 3678 3679 3680 3681 3682
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3683
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3684 3685 3686 3687 3688 3689
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

H
Haojun Liao 已提交
3690
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3691 3692
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3693 3694
    return;
  }
3695

3696
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3697

3698
  taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3699
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
3700 3701
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
3702

3703
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3704
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3705
  } else {
3706
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3707 3708
  }

3709 3710
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3711
  }
3712

3713
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3714

3715 3716
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3717

3718 3719
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3720

3721
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3722

dengyihao's avatar
dengyihao 已提交
3723 3724 3725 3726
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
            pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3727

wafwerar's avatar
wafwerar 已提交
3728
  taosMemoryFreeClear(pTsdbReadHandle);
3729
}
H
Hongze Cheng 已提交
3730 3731

#endif