tsdbRead.c 126.8 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
L
Liu Jicong 已提交
17
#include "vnode.h"
18

dengyihao's avatar
dengyihao 已提交
19 20
#define EXTRA_BYTES                2
#define ASCENDING_TRAVERSE(o)      (o == TSDB_ORDER_ASC)
21
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
22

H
Hongze Cheng 已提交
23 24 25 26
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                      \
  ((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
                    .numOfCols = (_block)->numOfCols,                                     \
                    .rows = (_block)->numOfRows,                                          \
27
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
28

H
hjxilinx 已提交
29
enum {
dengyihao's avatar
dengyihao 已提交
30 31
  TSDB_QUERY_TYPE_ALL = 1,
  TSDB_QUERY_TYPE_LAST = 2,
H
hjxilinx 已提交
32 33
};

34
enum {
dengyihao's avatar
dengyihao 已提交
35
  TSDB_CACHED_TYPE_NONE = 0,
36
  TSDB_CACHED_TYPE_LASTROW = 1,
dengyihao's avatar
dengyihao 已提交
37
  TSDB_CACHED_TYPE_LAST = 2,
38 39
};

40
typedef struct SQueryFilePos {
dengyihao's avatar
dengyihao 已提交
41 42 43 44 45 46 47
  int32_t     fid;
  int32_t     slot;
  int32_t     pos;
  int64_t     lastKey;
  int32_t     rows;
  bool        mixBlock;
  bool        blockCompleted;
48
  STimeWindow win;
49
} SQueryFilePos;
H
hjxilinx 已提交
50

51
typedef struct SDataBlockLoadInfo {
dengyihao's avatar
dengyihao 已提交
52 53 54 55
  SDFileSet* fileGroup;
  int32_t    slot;
  uint64_t   uid;
  SArray*    pLoadedCols;
56
} SDataBlockLoadInfo;
H
hjxilinx 已提交
57

58
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
59
  int32_t tid; /* table tid */
60 61
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
62

63
enum {
dengyihao's avatar
dengyihao 已提交
64
  CHECKINFO_CHOSEN_MEM = 0,
65
  CHECKINFO_CHOSEN_IMEM = 1,
dengyihao's avatar
dengyihao 已提交
66
  CHECKINFO_CHOSEN_BOTH = 2  // for update=2(merge case)
67 68
};

69
typedef struct STableCheckInfo {
dengyihao's avatar
dengyihao 已提交
70 71 72 73 74 75 76 77 78
  uint64_t           tableId;
  TSKEY              lastKey;
  SBlockInfo*        pCompInfo;
  int32_t            compSize;
  int32_t            numOfBlocks : 29;  // number of qualified data blocks not the original blocks
  uint8_t            chosen : 2;        // indicate which iterator should move forward
  bool               initBuf : 1;       // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;              // mem buffer skip list iterator
  SSkipListIterator* iiter;             // imem buffer skip list iterator
79
} STableCheckInfo;
80

81
typedef struct STableBlockInfo {
dengyihao's avatar
dengyihao 已提交
82 83
  SBlock*          compBlock;
  STableCheckInfo* pTableCheckInfo;
84
} STableBlockInfo;
85

86
typedef struct SBlockOrderSupporter {
dengyihao's avatar
dengyihao 已提交
87 88 89 90
  int32_t           numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*          blockIndexArray;
  int32_t*          numOfBlocksPerTable;
91 92
} SBlockOrderSupporter;

H
Haojun Liao 已提交
93 94 95
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
96
  int64_t checkForNextTime;
97 98
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
99 100
} SIOCostSummary;

101
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
102 103 104 105
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
  SArray*          defaultLoadColumn;  // default load column
  int32_t*         slotIds;            // colId to slotId
106 107
} SBlockLoadSuppInfo;

108
typedef struct STsdbReadHandle {
C
Cary Xu 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
  STsdb*        pTsdb;
  SQueryFilePos cur;  // current position
  int16_t       order;
  STimeWindow   window;  // the primary query time window that applies to all queries
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
  int32_t numOfBlocks;
  SArray* pColumns;  // column list, SColumnInfoData array list
  bool    locateStart;
  int32_t outputCapacity;
  int32_t realNumOfRows;
  SArray* pTableCheckInfo;  // SArray<STableCheckInfo>
  int32_t activeIndex;
  bool    checkFiles;               // check file stage
  int8_t  cachelastrow;             // check if last row cached
  bool    loadExternalRow;          // load time window external data rows
  bool    currentLoadExternalRows;  // current load external rows
  int32_t loadType;                 // block load type
  char*   idStr;                    // query info handle, for debug purpose
dengyihao's avatar
dengyihao 已提交
128 129 130 131 132
  int32_t type;  // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
  SDFileSet*         pFileGroup;
  SFSIter            fileIter;
  SReadH             rhelper;
  STableBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
133 134 135 136
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
137
  SBlockLoadSuppInfo suppInfo;
C
Cary Xu 已提交
138 139 140 141
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
  SIOCostSummary     cost;
  STSchema*          pSchema;
142
} STsdbReadHandle;
143

wmmhello's avatar
wmmhello 已提交
144 145
static STimeWindow updateLastrowForEachGroup(STableListInfo* pList);
static int32_t     checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList);
dengyihao's avatar
dengyihao 已提交
146
static int32_t     checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
147
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
148

H
Haojun Liao 已提交
149
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
150
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
dengyihao's avatar
dengyihao 已提交
151 152
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
                                     STsdbReadHandle* pTsdbReadHandle);
153
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
dengyihao's avatar
dengyihao 已提交
154 155 156 157
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
// static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
158

C
Cary Xu 已提交
159
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions);
C
Cary Xu 已提交
160

161
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
162
  pBlockLoadInfo->slot = -1;
dengyihao's avatar
dengyihao 已提交
163
  pBlockLoadInfo->uid = 0;
H
hjxilinx 已提交
164
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
165 166
}

167
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
168
  pCompBlockLoadInfo->tid = -1;
169 170
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
171

172 173
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
174 175 176 177
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
178
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
179 180 181 182 183 184
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

185 186
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
187 188 189 190 191

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
192 193
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
194 195 196 197 198 199
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
200
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
dengyihao's avatar
dengyihao 已提交
201
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
202

dengyihao's avatar
dengyihao 已提交
203 204 205 206 207
  int64_t        rows = 0;
  STsdbMemTable* pMemTable = NULL;  // pTsdbReadHandle->pMemTable;
  if (pMemTable == NULL) {
    return rows;
  }
H
Haojun Liao 已提交
208 209 210 211 212

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

dengyihao's avatar
dengyihao 已提交
213 214 215 216 217 218 219 220
    //    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
    //      pMem = pMemT->tData[pCheckInfo->tableId];
    //      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
    //    }
    //    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
    //      pIMem = pIMemT->tData[pCheckInfo->tableId];
    //      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
    //    }
H
Haojun Liao 已提交
221 222 223
  }
  return rows;
}
224

wmmhello's avatar
wmmhello 已提交
225 226 227
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
  size_t tableSize = taosArrayGetSize(pTableList->pTableList);
  assert(tableSize >= 1);
H
Haojun Liao 已提交
228 229

  // allocate buffer in order to load data blocks from file
wmmhello's avatar
wmmhello 已提交
230
  SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
H
Haojun Liao 已提交
231 232 233 234 235
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
236 237
  for (int32_t j = 0; j < tableSize; ++j) {
    STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
H
Haojun Liao 已提交
238

wmmhello's avatar
wmmhello 已提交
239 240 241
    STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
242
        info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
243 244
      }

wmmhello's avatar
wmmhello 已提交
245 246 247
      assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
    } else {
      info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
248
    }
wmmhello's avatar
wmmhello 已提交
249 250

    taosArrayPush(pTableCheckInfo, &info);
dengyihao's avatar
dengyihao 已提交
251 252
    tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
253 254
  }

255
  // TODO  group table according to the tag value.
256
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
257 258 259
  return pTableCheckInfo;
}

260 261
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
262 263 264 265
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
dengyihao's avatar
dengyihao 已提交
266
    STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
267
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
dengyihao's avatar
dengyihao 已提交
268 269
    pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
270
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
271

272 273
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
274
    } else {
275
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
276
    }
H
Haojun Liao 已提交
277 278 279
  }
}

H
Haojun Liao 已提交
280 281 282
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
283

dengyihao's avatar
dengyihao 已提交
284
  STableCheckInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
285

H
Haojun Liao 已提交
286 287
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
288 289 290
  return pNew;
}

291 292
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
293

294
  STimeWindow* w = &pTsdbReadHandle->window;
dengyihao's avatar
dengyihao 已提交
295
  bool         asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
296 297 298 299

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

300 301
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
302
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
C
Cary Xu 已提交
303
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdb);
304 305

  int64_t now = taosGetTimestamp(pCfg->precision);
306
  return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
307 308
}

309 310
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
311

312
  bool    updateTs = false;
313 314 315 316
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
317
      pCond->twindows[tWinIdx].skey = startTs;
318
      updateTs = true;
319 320
    }
  } else {
321 322
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
323
      pCond->twindows[tWinIdx].ekey = startTs;
324
      updateTs = true;
325 326 327
    }
  }

328
  if (updateTs) {
H
Haojun Liao 已提交
329
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
L
Liu Jicong 已提交
330 331
              pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey,
              pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
332
  }
333
}
C
Cary Xu 已提交
334

C
Cary Xu 已提交
335
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) {
C
Cary Xu 已提交
336
  if (VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
337
    int     level = 0;
C
Cary Xu 已提交
338
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
C
Cary Xu 已提交
339

C
Cary Xu 已提交
340
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
C
Cary Xu 已提交
341 342 343 344 345
      SRetention* pRetention = retentions + level;
      if (pRetention->keep <= 0) {
        if (level > 0) {
          --level;
        }
C
Cary Xu 已提交
346 347
        break;
      }
C
Cary Xu 已提交
348
      if ((now - pRetention->keep) <= winSKey) {
C
Cary Xu 已提交
349
        break;
C
Cary Xu 已提交
350 351
      }
      ++level;
C
Cary Xu 已提交
352
    }
C
Cary Xu 已提交
353

C
Cary Xu 已提交
354
    if (level == TSDB_RETENTION_L0) {
S
Shengliang Guan 已提交
355
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
356
                TSDB_RETENTION_L0);
C
Cary Xu 已提交
357 358
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
S
Shengliang Guan 已提交
359
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
360
                TSDB_RETENTION_L1);
C
Cary Xu 已提交
361 362
      return VND_RSMA1(pVnode);
    } else {
S
Shengliang Guan 已提交
363
      tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
dengyihao's avatar
dengyihao 已提交
364
                TSDB_RETENTION_L2);
C
Cary Xu 已提交
365 366
      return VND_RSMA2(pVnode);
    }
C
Cary Xu 已提交
367
  }
C
Cary Xu 已提交
368
  return VND_TSDB(pVnode);
C
Cary Xu 已提交
369 370
}

371
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
372
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
373
  if (pReadHandle == NULL) {
374
    goto _end;
375
  }
H
Haojun Liao 已提交
376

377
  STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions);
C
Cary Xu 已提交
378

dengyihao's avatar
dengyihao 已提交
379
  pReadHandle->order = pCond->order;
C
Cary Xu 已提交
380
  pReadHandle->pTsdb = pTsdb;
dengyihao's avatar
dengyihao 已提交
381 382 383 384 385 386
  pReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid = INT32_MIN;
  pReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles = true;
  pReadHandle->activeIndex = 0;  // current active table index
  pReadHandle->allocSize = 0;
387
  pReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
388
  pReadHandle->loadType = pCond->type;
389

dengyihao's avatar
dengyihao 已提交
390
  pReadHandle->outputCapacity = 4096;  //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
391 392 393
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
394
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
395
  snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
H
Haojun Liao 已提交
396 397
  pReadHandle->idStr = strdup(buf);

C
Cary Xu 已提交
398
  if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
399
    goto _end;
B
Bomin Zhang 已提交
400
  }
H
Haojun Liao 已提交
401

402
  assert(pCond != NULL);
403
  setQueryTimewindow(pReadHandle, pCond, 0);
404

405
  if (pCond->numOfCols > 0) {
H
Haojun Liao 已提交
406
    int32_t rowLen = 0;
dengyihao's avatar
dengyihao 已提交
407
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
408 409 410
      rowLen += pCond->colList[i].bytes;
    }

411 412 413 414 415 416
    // make sure the output SSDataBlock size be less than 2MB.
    int32_t TWOMB = 2 * 1024 * 1024;
    if (pReadHandle->outputCapacity * rowLen > TWOMB) {
      pReadHandle->outputCapacity = TWOMB / rowLen;
    }

417
    // allocate buffer in order to load data blocks from file
418 419
    pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
    if (pReadHandle->suppInfo.pstatis == NULL) {
420
      goto _end;
421
    }
H
Haojun Liao 已提交
422

423
    // todo: use list instead of array?
424 425
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
426
      goto _end;
427
    }
H
Haojun Liao 已提交
428

429 430 431
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
432

433
      int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
434
      if (code != TSDB_CODE_SUCCESS) {
435
        goto _end;
436
      }
437

438
      taosArrayPush(pReadHandle->pColumns, &colInfo);
B
Bomin Zhang 已提交
439
    }
H
Haojun Liao 已提交
440

441
    pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
442 443 444 445

    size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
    pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
    pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
H
Haojun Liao 已提交
446
  }
447

C
Cary Xu 已提交
448
  pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
449
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
450
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
451
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
452
    goto _end;
H
hjxilinx 已提交
453
  }
454

455 456
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
457

H
Haojun Liao 已提交
458
  return (tsdbReaderT)pReadHandle;
459

dengyihao's avatar
dengyihao 已提交
460
_end:
461
  tsdbCleanupReadHandle(pReadHandle);
462
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
463
  return NULL;
H
hjxilinx 已提交
464 465
}

466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle) {
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);

  int32_t sversion = 1;

  SMetaReader mr = {0};
  metaReaderInit(&mr, pVnode->pMeta, 0);
  int32_t code = metaGetTableEntryByUid(&mr, pCheckInfo->tableId);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    metaReaderClear(&mr);
    return terrno;
  }

  if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    code = metaGetTableEntryByUid(&mr, suid);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
      metaReaderClear(&mr);
      return terrno;
    }
    sversion = mr.me.stbEntry.schemaRow.version;
  } else {
    ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
    sversion = mr.me.ntbEntry.schemaRow.version;
  }

  metaReaderClear(&mr);
  pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, sversion);
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
499
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
500
                             uint64_t taskId) {
501
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
502
  if (pTsdbReadHandle == NULL) {
503 504 505
    return NULL;
  }

506
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
507
    return (tsdbReaderT*)pTsdbReadHandle;
508
  }
H
Haojun Liao 已提交
509 510

  // todo apply the lastkey of table check to avoid to load header file
wmmhello's avatar
wmmhello 已提交
511
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
512
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
513
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
514 515 516 517
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

518 519 520 521 522
  int32_t code = setCurrentSchema(pVnode, pTsdbReadHandle);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }
523

C
Cary Xu 已提交
524
  int32_t  numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
525 526 527 528 529
  int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;

  STSchema* pSchema = pTsdbReadHandle->pSchema;

  int32_t i = 0, j = 0;
C
Cary Xu 已提交
530
  while (i < numOfCols && j < pSchema->numOfCols) {
531 532 533 534 535 536 537 538 539 540 541 542
    if (ids[i] == pSchema->columns[j].colId) {
      pTsdbReadHandle->suppInfo.slotIds[i] = j;
      i++;
      j++;
    } else if (ids[i] > pSchema->columns[j].colId) {
      j++;
    } else {
      //    tsdbCleanupReadHandle(pTsdbReadHandle);
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
    }
  }
543

wmmhello's avatar
wmmhello 已提交
544 545
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
            taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
dengyihao's avatar
dengyihao 已提交
546
            pTsdbReadHandle->idStr);
547

dengyihao's avatar
dengyihao 已提交
548
  return (tsdbReaderT)pTsdbReadHandle;
H
Haojun Liao 已提交
549 550
}

551
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
552
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
553

554 555 556
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
wafwerar's avatar
wafwerar 已提交
557
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
558 559 560 561 562
    }

    return;
  }

dengyihao's avatar
dengyihao 已提交
563
  pTsdbReadHandle->order = pCond->order;
564
  setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx);
dengyihao's avatar
dengyihao 已提交
565 566 567 568 569
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
570 571
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
572 573

  if (ASCENDING_TRAVERSE(pCond->order)) {
574
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
575
  } else {
576
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
577 578 579
  }

  // allocate buffer in order to load data blocks from file
580 581
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
582

583 584
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
585

586
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
587 588
}

L
Liu Jicong 已提交
589 590
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
                                     int32_t tWinIdx) {
591
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
592

dengyihao's avatar
dengyihao 已提交
593
  pTsdbReadHandle->order = pCond->order;
594
  pTsdbReadHandle->window = pCond->twindows[tWinIdx];
dengyihao's avatar
dengyihao 已提交
595 596 597 598 599
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
600 601
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
602 603

  if (ASCENDING_TRAVERSE(pCond->order)) {
604
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
605
  } else {
606
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
607 608 609
  }

  // allocate buffer in order to load data blocks from file
610 611
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
612

613 614
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
615

H
Haojun Liao 已提交
616
  SArray* pTable = NULL;
dengyihao's avatar
dengyihao 已提交
617
  //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
618

dengyihao's avatar
dengyihao 已提交
619
  //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
620

dengyihao's avatar
dengyihao 已提交
621 622
  pTsdbReadHandle->pTableCheckInfo = NULL;  // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
                                            // &pTable);
623
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
624
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
625 626
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
627

dengyihao's avatar
dengyihao 已提交
628 629
  //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
630 631
}

wmmhello's avatar
wmmhello 已提交
632
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
633
                             uint64_t taskId) {
634
  pCond->twindows[0] = updateLastrowForEachGroup(pList);
H
Haojun Liao 已提交
635 636

  // no qualified table
wmmhello's avatar
wmmhello 已提交
637
  if (taosArrayGetSize(pList->pTableList) == 0) {
H
Haojun Liao 已提交
638 639 640
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
641
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId);
642
  if (pTsdbReadHandle == NULL) {
643 644 645
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
646
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList);
dengyihao's avatar
dengyihao 已提交
647
  if (code != TSDB_CODE_SUCCESS) {  // set the numOfTables to be 0
H
Haojun Liao 已提交
648 649 650
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
651

652
  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey);
653 654
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
655
  }
dengyihao's avatar
dengyihao 已提交
656

657
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
658 659
}

660
#if 0
661 662
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
663
  if (pTsdbReadHandle == NULL) {
664 665 666
    return NULL;
  }

667
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
668 669 670 671 672
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

673 674
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
675
  }
D
init  
dapan1121 已提交
676
  
677
  return pTsdbReadHandle;
H
hjxilinx 已提交
678 679
}

680
#endif
dengyihao's avatar
dengyihao 已提交
681
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
682
  assert(pHandle != NULL);
H
Haojun Liao 已提交
683

dengyihao's avatar
dengyihao 已提交
684
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
685

dengyihao's avatar
dengyihao 已提交
686
  size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
687
  SArray* res = taosArrayInit(size, POINTER_BYTES);
688 689 690
  return res;
}

H
Haojun Liao 已提交
691
// leave only one table for each group
dengyihao's avatar
dengyihao 已提交
692
// static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
wmmhello's avatar
wmmhello 已提交
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
//  assert(pGroupList);
//  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
//
//  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
//  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
//
//  for (int32_t i = 0; i < numOfGroup; ++i) {
//    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
//    size_t  numOfTables = taosArrayGetSize(oneGroup);
//
//    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
//    for (int32_t j = 0; j < numOfTables; ++j) {
//      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
//      //      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
//      //        taosArrayPush(px, pInfo);
//      //        pNew->numOfTables += 1;
//      //        break;
//      //      }
//    }
//
//    // there are no data in this group
//    if (taosArrayGetSize(px) == 0) {
//      taosArrayDestroy(px);
//    } else {
//      taosArrayPush(pNew->pGroupList, &px);
//    }
//  }
//
//  return pNew;
//}
723

dengyihao's avatar
dengyihao 已提交
724
// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
wmmhello's avatar
wmmhello 已提交
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
//                                          uint64_t qId, uint64_t taskId) {
//  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
//
//  if (pNew->numOfTables == 0) {
//    tsdbDebug("update query time range to invalidate time window");
//
//    assert(taosArrayGetSize(pNew->pGroupList) == 0);
//    bool asc = ASCENDING_TRAVERSE(pCond->order);
//    if (asc) {
//      pCond->twindow.ekey = pCond->twindow.skey - 1;
//    } else {
//      pCond->twindow.skey = pCond->twindow.ekey - 1;
//    }
//  }
//
//  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
//  pTsdbReadHandle->loadExternalRow = true;
//  pTsdbReadHandle->currentLoadExternalRows = true;
//
//  return pTsdbReadHandle;
//}
746

747
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
748
  if (pCheckInfo->initBuf) {
749 750
    return true;
  }
H
Haojun Liao 已提交
751

752
  pCheckInfo->initBuf = true;
753
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
754

755 756 757
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

758
  TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
759 760 761
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
762
      pCheckInfo->iter =
763
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
764
    }
765
  }
H
Haojun Liao 已提交
766

767 768 769
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
770
      pCheckInfo->iiter =
771
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
772
    }
773
  }
H
Haojun Liao 已提交
774

775 776 777 778
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
779

dengyihao's avatar
dengyihao 已提交
780
  bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
781
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
dengyihao's avatar
dengyihao 已提交
782
  if (memEmpty && imemEmpty) {  // buffer is empty
783 784
    return false;
  }
H
Haojun Liao 已提交
785

786 787 788
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
789

H
Haojun Liao 已提交
790 791
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
792
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
793 794 795
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey,
              (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
796 797 798 799 800 801 802

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

803
  } else {
dengyihao's avatar
dengyihao 已提交
804
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
805
  }
H
Haojun Liao 已提交
806

807 808 809
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
810

H
Haojun Liao 已提交
811 812
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
813
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
814 815 816
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey,
              (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
817 818 819 820 821 822

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
823
  } else {
dengyihao's avatar
dengyihao 已提交
824
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
825
  }
H
Haojun Liao 已提交
826

827 828 829
  return true;
}

H
Haojun Liao 已提交
830 831 832 833 834
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

C
Cary Xu 已提交
835
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
H
Haojun Liao 已提交
836
  STSRow *rmem = NULL, *rimem = NULL;
837 838 839
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
840
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
C
Cary Xu 已提交
841 842 843 844
      // TODO: filter max version
      // if (TD_ROW_VER(rmem) > maxVer) {
      //   rmem = NULL;
      // }
845 846 847 848 849 850
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
851
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
C
Cary Xu 已提交
852 853 854 855
      // TODO: filter max version
      // if (TD_ROW_VER(rimem) > maxVer) {
      //   rimem = NULL;
      // }
856 857 858 859 860 861 862 863 864
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
865
    return TD_ROW_KEY(rmem);
866 867 868 869
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
870
    return TD_ROW_KEY(rimem);
871 872
  }

H
Haojun Liao 已提交
873 874
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
875 876

  if (r1 == r2) {
C
Cary Xu 已提交
877
#if 0
dengyihao's avatar
dengyihao 已提交
878
    if (update == TD_ROW_DISCARD_UPDATE) {
879 880
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
dengyihao's avatar
dengyihao 已提交
881
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
882 883 884 885 886
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
C
Cary Xu 已提交
887 888 889 890 891 892 893
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
    }
894 895 896 897
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
dengyihao's avatar
dengyihao 已提交
898
  } else {
899 900 901 902 903
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

C
Cary Xu 已提交
904 905
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
                                 TDRowVerT maxVer) {
H
Haojun Liao 已提交
906
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
907 908 909
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
910
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
C
Cary Xu 已提交
911
#if 0  // TODO: skiplist refactor
C
Cary Xu 已提交
912 913 914
      if (TD_ROW_VER(rmem) > maxVer) {
        rmem = NULL;
      }
C
Cary Xu 已提交
915
#endif
H
Haojun Liao 已提交
916 917
    }
  }
918

H
Haojun Liao 已提交
919 920 921
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
922
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
C
Cary Xu 已提交
923
#if 0  // TODO: skiplist refactor
C
Cary Xu 已提交
924 925 926
      if (TD_ROW_VER(rimem) > maxVer) {
        rimem = NULL;
      }
C
Cary Xu 已提交
927
#endif
H
Haojun Liao 已提交
928 929
    }
  }
930

H
Haojun Liao 已提交
931 932
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
933
  }
934

H
Haojun Liao 已提交
935
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
936 937 938
    pCheckInfo->chosen = 0;
    return rmem;
  }
939

H
Haojun Liao 已提交
940
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
941 942 943
    pCheckInfo->chosen = 1;
    return rimem;
  }
944

H
Haojun Liao 已提交
945 946
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
947

948
  if (r1 == r2) {
C
Cary Xu 已提交
949
#if 0
950
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
951
      tSkipListIterNext(pCheckInfo->iter);
952
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
953
      return rimem;
dengyihao's avatar
dengyihao 已提交
954
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
955
      tSkipListIterNext(pCheckInfo->iiter);
956 957 958 959
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
960
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
961 962
      return rmem;
    }
C
Cary Xu 已提交
963 964 965 966 967 968 969 970 971 972
#endif
    if (TD_SUPPORT_UPDATE(update)) {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
      *extraRow = rimem;
      return rmem;
    } else {
      tSkipListIterNext(pCheckInfo->iter);
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      return rimem;
    }
H
Haojun Liao 已提交
973 974 975
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
976
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
977 978
        return rmem;
      } else {
979
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
980 981 982 983
        return rimem;
      }
    } else {
      if (r1 < r2) {
984
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
985 986
        return rimem;
      } else {
987
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
988 989 990 991
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
992 993
}

994
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
995
  bool hasNext = false;
996
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
997 998 999
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
1000

H
Haojun Liao 已提交
1001 1002 1003
    if (hasNext) {
      return hasNext;
    }
1004

H
Haojun Liao 已提交
1005 1006 1007
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
dengyihao's avatar
dengyihao 已提交
1008
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
1009 1010 1011
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
1012

1013 1014 1015
    if (hasNext) {
      return hasNext;
    }
1016

1017 1018
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
1019
    }
1020 1021 1022 1023 1024 1025 1026
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
1027
  }
1028

H
Haojun Liao 已提交
1029 1030 1031
  return hasNext;
}

1032
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
Hongze Cheng 已提交
1033
  STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
dengyihao's avatar
dengyihao 已提交
1034
  size_t    size = taosArrayGetSize(pHandle->pTableCheckInfo);
1035
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
1036
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
1037

1038
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
1039 1040 1041 1042
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

C
Cary Xu 已提交
1043
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
1044
  if (row == NULL) {
1045 1046
    return false;
  }
1047

H
Haojun Liao 已提交
1048
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
dengyihao's avatar
dengyihao 已提交
1049 1050
  tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
            pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
1051

1052
  // all data in mem are checked already.
1053 1054
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
1055 1056
    return false;
  }
H
Haojun Liao 已提交
1057

dengyihao's avatar
dengyihao 已提交
1058
  int32_t      step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
1059
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
1060
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
1061

1062 1063 1064 1065
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
1066

1067
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1068
    TSWAP(win->skey, win->ekey);
1069
  }
H
Haojun Liao 已提交
1070

1071
  return true;
1072
}
H
hjxilinx 已提交
1073

1074 1075
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
1076 1077 1078
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
1079

D
dapan1121 已提交
1080
  if (key < 0) {
1081
    key -= (daysPerFile * tsTickPerMin[precision]);
D
dapan1121 已提交
1082
  }
dengyihao's avatar
dengyihao 已提交
1083

1084
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision]));  // set the starting fileId
dengyihao's avatar
dengyihao 已提交
1085
  if (fid < 0L && llabs(fid) > INT32_MAX) {                                // data value overflow for INT32
1086 1087
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
1088

1089
  if (fid > 0L && fid > INT32_MAX) {
1090 1091
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
1092

S
TD-1057  
Shengliang Guan 已提交
1093
  return (int32_t)fid;
1094 1095
}

H
refact  
Hongze Cheng 已提交
1096
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1097 1098
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1099

1100
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1101

1102 1103 1104
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1105

1106
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1107

H
Hongze Cheng 已提交
1108
    if (skey > pBlock[midSlot].maxKey.ts) {
1109
      if (numOfBlocks == 2) break;
H
Hongze Cheng 已提交
1110
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
1111
      firstSlot = midSlot + 1;
H
Hongze Cheng 已提交
1112 1113
    } else if (skey < pBlock[midSlot].minKey.ts) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
1114 1115 1116 1117 1118
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1119

1120 1121
  return midSlot;
}
1122

dengyihao's avatar
dengyihao 已提交
1123
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1124
  int32_t code = 0;
H
Haojun Liao 已提交
1125

1126
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1127
  pCheckInfo->numOfBlocks = 0;
1128

H
Haojun Liao 已提交
1129
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
1130
  table.pSchema = pTsdbReadHandle->pSchema;
H
Haojun Liao 已提交
1131 1132

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1133 1134 1135
    code = terrno;
    return code;
  }
1136

1137
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1138

H
Haojun Liao 已提交
1139
  // no data block in this file, try next file
1140
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1141 1142
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1143

H
Haojun Liao 已提交
1144 1145 1146
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1147
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1148 1149 1150 1151
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1152 1153
    }

H
Haojun Liao 已提交
1154 1155 1156
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1157

1158
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1159 1160
    return terrno;
  }
H
Haojun Liao 已提交
1161
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1162

H
Haojun Liao 已提交
1163
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1164

1165
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1166 1167
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1168
  } else {
dengyihao's avatar
dengyihao 已提交
1169 1170
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1171
  }
1172

dengyihao's avatar
dengyihao 已提交
1173 1174
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1175

H
Haojun Liao 已提交
1176 1177 1178
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1179

H
Hongze Cheng 已提交
1180
  if (s > pCompInfo->blocks[start].maxKey.ts) {
H
Haojun Liao 已提交
1181 1182
    return 0;
  }
1183

H
Haojun Liao 已提交
1184
  // todo speedup the procedure of located end block
H
Hongze Cheng 已提交
1185
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)) {
H
Haojun Liao 已提交
1186 1187
    end += 1;
  }
1188

H
Haojun Liao 已提交
1189
  pCheckInfo->numOfBlocks = (end - start);
1190

H
Haojun Liao 已提交
1191 1192 1193
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1194

H
Haojun Liao 已提交
1195 1196 1197
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1198

1199
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1200 1201 1202 1203
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1204
  pTsdbReadHandle->cost.headFileLoad += 1;
1205 1206
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1207
  size_t numOfTables = 0;
1208 1209 1210 1211
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1212

H
Haojun Liao 已提交
1213
    for (int32_t i = 0; i < numOfTables; ++i) {
1214
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1215
      if (code != TSDB_CODE_SUCCESS) {
1216 1217
        int64_t e = taosGetTimestampUs();

1218
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1219 1220 1221 1222 1223
        return code;
      }
    }
  } else {
    assert(0);
1224
  }
1225

1226
  int64_t e = taosGetTimestampUs();
1227
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1228
  return code;
1229 1230
}

dengyihao's avatar
dengyihao 已提交
1231 1232
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                   int32_t slotIndex) {
H
Haojun Liao 已提交
1233
  int64_t st = taosGetTimestampUs();
1234

C
Cary Xu 已提交
1235
  int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1236
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1237
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1238 1239 1240 1241
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1242
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1243
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1244
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1245 1246 1247 1248
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1249
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1250
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1251
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1252 1253 1254
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1255

1256
  int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
1257

dengyihao's avatar
dengyihao 已提交
1258 1259
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
                                      (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1260
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1261 1262 1263
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1264
  }
1265

1266
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1267

1268 1269
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1270
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1271

1272
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1273
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1274

1275
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1276

1277
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
H
Hongze Cheng 已提交
1278
  if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1279
    int64_t* src = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1280
    for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
1281 1282 1283 1284
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1285
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1286
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1287

dengyihao's avatar
dengyihao 已提交
1288 1289
  tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
            " us, %s",
H
Hongze Cheng 已提交
1290
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
dengyihao's avatar
dengyihao 已提交
1291
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1292
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1293 1294 1295 1296

_error:
  pBlock->numOfRows = 0;

dengyihao's avatar
dengyihao 已提交
1297
  tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
H
Hongze Cheng 已提交
1298 1299
            pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1300
  return terrno;
H
hjxilinx 已提交
1301 1302
}

1303
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
dengyihao's avatar
dengyihao 已提交
1304 1305 1306 1307 1308 1309 1310
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end);
static void    doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void    copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                              SDataBlockInfo* pBlockInfo, int32_t endPos);

static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
1311
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1312
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1313
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1314
  TSKEY          key;
dengyihao's avatar
dengyihao 已提交
1315
  int32_t        code = TSDB_CODE_SUCCESS;
1316

1317
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1318 1319
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

C
Cary Xu 已提交
1320
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update, TD_VER_MAX);
1321

H
Haojun Liao 已提交
1322
  if (key != TSKEY_INITIAL_VAL) {
dengyihao's avatar
dengyihao 已提交
1323
    tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1324
  } else {
H
Haojun Liao 已提交
1325
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1326
  }
H
Haojun Liao 已提交
1327

1328 1329
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

1330 1331
  if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
1332 1333 1334
    bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
                                    (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
    if (cacheDataInFileBlockHole) {
H
Haojun Liao 已提交
1335
      // do not load file block into buffer
1336
      int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1337

1338
      TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
dengyihao's avatar
dengyihao 已提交
1339 1340
      cur->rows =
          tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
1341
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1342 1343 1344

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1345 1346

      if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1347
        TSWAP(cur->win.skey, cur->win.ekey);
H
Haojun Liao 已提交
1348
      }
H
Haojun Liao 已提交
1349

H
Haojun Liao 已提交
1350 1351
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1352
      return code;
H
Haojun Liao 已提交
1353
    }
H
Haojun Liao 已提交
1354

1355
    // return error, add test cases
1356
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1357
      return code;
1358 1359
    }

1360
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1361
  } else {
1362 1363 1364 1365 1366
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1367
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1368

1369 1370
    bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
    if (wholeBlockReturned) {
1371
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1372 1373

      cur->rows = binfo.rows;
dengyihao's avatar
dengyihao 已提交
1374
      cur->win = binfo.window;
1375
      cur->mixBlock = false;
H
Haojun Liao 已提交
1376 1377
      cur->blockCompleted = true;

1378
      if (ascScan) {
H
Haojun Liao 已提交
1379 1380 1381 1382 1383 1384
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
dengyihao's avatar
dengyihao 已提交
1385
    } else {  // partially copy to dest buffer
1386
      // make sure to only load once
dengyihao's avatar
dengyihao 已提交
1387
      bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan)));
1388 1389 1390 1391 1392 1393 1394
      if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
        code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }

1395
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1396 1397
      cur->mixBlock = true;
    }
1398

1399
    if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
1400
      ASSERT(cur->blockCompleted || cur->mixBlock);
1401 1402
    }

H
Haojun Liao 已提交
1403
    if (cur->rows == binfo.rows) {
dengyihao's avatar
dengyihao 已提交
1404
      tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
H
Haojun Liao 已提交
1405
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1406
    } else {
dengyihao's avatar
dengyihao 已提交
1407 1408 1409 1410
      tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
                ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
                pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1411
    }
1412
  }
H
Haojun Liao 已提交
1413 1414

  return code;
1415 1416
}

1417 1418
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);

dengyihao's avatar
dengyihao 已提交
1419 1420
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                 bool* exists) {
1421
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1422 1423
  int32_t        code = TSDB_CODE_SUCCESS;
  bool           asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1424

1425
  if (asc) {
H
Haojun Liao 已提交
1426
    // query ended in/started from current block
H
Hongze Cheng 已提交
1427
    if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
1428
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1429 1430
        *exists = false;
        return code;
1431
      }
1432

1433
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1434
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1435

H
Hongze Cheng 已提交
1436
      if (pCheckInfo->lastKey > pBlock->minKey.ts) {
1437
        cur->pos =
1438
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1439 1440 1441
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1442

H
Hongze Cheng 已提交
1443
      assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
1444
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1445
    } else {  // the whole block is loaded in to buffer
dengyihao's avatar
dengyihao 已提交
1446
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1447
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1448
    }
dengyihao's avatar
dengyihao 已提交
1449
  } else {  // desc order, query ended in current block
H
Hongze Cheng 已提交
1450
    if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
1451
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1452 1453
        *exists = false;
        return code;
1454
      }
H
Haojun Liao 已提交
1455

1456
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Hongze Cheng 已提交
1457
      if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
dengyihao's avatar
dengyihao 已提交
1458 1459
        cur->pos =
            binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1460
      } else {
H
Haojun Liao 已提交
1461
        cur->pos = pBlock->numOfRows - 1;
1462
      }
H
Haojun Liao 已提交
1463

H
Hongze Cheng 已提交
1464
      assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
1465
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1466
    } else {
dengyihao's avatar
dengyihao 已提交
1467
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1468
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1469
    }
1470
  }
1471

1472
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1473
  return code;
H
[td-32]  
hjxilinx 已提交
1474 1475
}

1476
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1477
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1478
  int    numOfRows;
1479 1480
  TSKEY* keyList;

1481
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1482

1483
  if (num <= 0) return -1;
1484 1485

  keyList = (TSKEY*)pValue;
1486 1487
  firstPos = 0;
  lastPos = num - 1;
1488

1489
  if (order == TSDB_ORDER_DESC) {
1490 1491 1492 1493 1494
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1495

H
Haojun Liao 已提交
1496 1497
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1498

1499 1500 1501 1502 1503 1504 1505 1506
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1507

1508 1509 1510 1511 1512
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1513

1514 1515 1516 1517 1518 1519 1520
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1521

H
Haojun Liao 已提交
1522 1523
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1524

1525 1526 1527 1528 1529 1530 1531 1532 1533
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1534

1535 1536 1537
  return midPos;
}

dengyihao's avatar
dengyihao 已提交
1538 1539
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end) {
1540
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1541
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1542

1543
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1544 1545 1546 1547 1548 1549
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1550 1551
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
  int32_t trueStart = ascScan ? start : end;
1552
  int32_t trueEnd = ascScan ? end : start;
1553 1554
  int32_t step = ascScan ? 1 : -1;

1555
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1556

dengyihao's avatar
dengyihao 已提交
1557
  // data in buffer has greater timestamp, copy data in file block
1558
  int32_t i = 0, j = 0;
dengyihao's avatar
dengyihao 已提交
1559
  while (i < requiredNumOfCols && j < pCols->numOfCols) {
1560
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1561 1562 1563 1564 1565 1566 1567

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1568
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1569
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
dengyihao's avatar
dengyihao 已提交
1570
        //        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1571
        int32_t rowIndex = numOfRows;
1572
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1573
          SCellVal sVal = {0};
C
Cary Xu 已提交
1574
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1575 1576
            TASSERT(0);
          }
1577

C
Cary Xu 已提交
1578
          if (sVal.valType == TD_VTYPE_NORM) {
1579
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1580 1581
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1582 1583 1584
          }
        }
      } else {  // handle the var-string
1585 1586
        int32_t rowIndex = numOfRows;

1587
        // todo refactor, only copy one-by-one
1588
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1589
          SCellVal sVal = {0};
dengyihao's avatar
dengyihao 已提交
1590
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
H
Haojun Liao 已提交
1591 1592
            TASSERT(0);
          }
1593

C
Cary Xu 已提交
1594
          if (sVal.valType == TD_VTYPE_NORM) {
1595
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
C
Cary Xu 已提交
1596 1597
          } else {
            colDataAppendNULL(pColInfo, rowIndex);
1598
          }
1599 1600
        }
      }
1601 1602 1603

      j++;
      i++;
H
Hongze Cheng 已提交
1604
    } else {  // pColInfo->info.colId < src->colId, it is a NULL data
1605
      colDataAppendNNULL(pColInfo, numOfRows, num);
1606
      i++;
1607 1608
    }
  }
1609

dengyihao's avatar
dengyihao 已提交
1610
  while (i < requiredNumOfCols) {  // the remain columns are all null data
1611
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1612
    colDataAppendNNULL(pColInfo, numOfRows, num);
1613
    i++;
1614
  }
H
Haojun Liao 已提交
1615

1616 1617
  pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
  pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
1618

1619
  return numOfRows + num;
1620 1621
}

C
Cary Xu 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
/**
 * @brief  // TODO fix bug for reverse copy data problem
 *        Note: row1 always has high priority
 *
 * @param pTsdbReadHandle
 * @param capacity
 * @param curRow
 * @param row1
 * @param row2
 * @param numOfCols
 * @param uid
 * @param pSchema1
 * @param pSchema2
 * @param update
 * @param lastRowKey
 * @return int32_t The quantity of rows appended
 */
static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
                                  STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                                  bool update, TSKEY* lastRowKey) {
H
Haojun Liao 已提交
1642
#if 1
dengyihao's avatar
dengyihao 已提交
1643 1644 1645 1646 1647 1648 1649 1650 1651
  STSchema* pSchema;
  STSRow*   row;
  int16_t   colId;
  int16_t   offset;

  bool     isRow1DataRow = TD_IS_TP_ROW(row1);
  bool     isRow2DataRow;
  bool     isChosenRowDataRow;
  int32_t  chosen_itr;
H
Haojun Liao 已提交
1652
  SCellVal sVal = {0};
C
Cary Xu 已提交
1653 1654
  TSKEY    rowKey = TSKEY_INITIAL_VAL;
  int32_t  nResult = 0;
C
Cary Xu 已提交
1655
  int32_t  mergeOption = 0;  // 0 discard 1 overwrite 2 merge
1656

H
Haojun Liao 已提交
1657
  // the schema version info is embeded in STSRow
1658 1659 1660
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
1661
    pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
1662
  }
1663

C
Cary Xu 已提交
1664
#ifdef TD_DEBUG_PRINT_ROW
C
Cary Xu 已提交
1665 1666 1667 1668 1669
  char   flags[70] = {0};
  STsdb* pTsdb = pTsdbReadHandle->rhelper.pRepo;
  snprintf(flags, 70, "%s:%d vgId:%d dir:%s row1%s=NULL,row2%s=NULL", __func__, __LINE__, TD_VID(pTsdb->pVnode),
           pTsdb->dir, row1 ? "!" : "", row2 ? "!" : "");
  tdSRowPrint(row1, pSchema1, flags);
C
Cary Xu 已提交
1670 1671
#endif

dengyihao's avatar
dengyihao 已提交
1672
  if (isRow1DataRow) {
1673
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1674
  } else {
H
Haojun Liao 已提交
1675
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1676
  }
1677

1678
  int32_t numOfColsOfRow2 = 0;
dengyihao's avatar
dengyihao 已提交
1679
  if (row2) {
H
Haojun Liao 已提交
1680
    isRow2DataRow = TD_IS_TP_ROW(row2);
1681
    if (pSchema2 == NULL) {
1682
      pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
1683
    }
dengyihao's avatar
dengyihao 已提交
1684
    if (isRow2DataRow) {
1685 1686
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1687
      numOfColsOfRow2 = tdRowGetNCols(row2);
1688 1689
    }
  }
C
Cary Xu 已提交
1690

1691
  int32_t i = 0, j = 0, k = 0;
dengyihao's avatar
dengyihao 已提交
1692
  while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1693
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1694 1695

    int32_t colIdOfRow1;
dengyihao's avatar
dengyihao 已提交
1696
    if (j >= numOfColsOfRow1) {
1697
      colIdOfRow1 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1698
    } else if (isRow1DataRow) {
1699 1700
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
C
Cary Xu 已提交
1701
      colIdOfRow1 = tdKvRowColIdAt(row1, j);
1702 1703 1704
    }

    int32_t colIdOfRow2;
dengyihao's avatar
dengyihao 已提交
1705
    if (k >= numOfColsOfRow2) {
1706
      colIdOfRow2 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1707
    } else if (isRow2DataRow) {
1708 1709
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
C
Cary Xu 已提交
1710
      colIdOfRow2 = tdKvRowColIdAt(row2, k);
1711 1712
    }

C
Cary Xu 已提交
1713
    if (colIdOfRow1 < colIdOfRow2) {  // the most probability
dengyihao's avatar
dengyihao 已提交
1714
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1715
        ++j;
C
Cary Xu 已提交
1716 1717
        continue;
      }
1718 1719 1720 1721
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
C
Cary Xu 已提交
1722
    } else if (colIdOfRow1 == colIdOfRow2) {
dengyihao's avatar
dengyihao 已提交
1723
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1724 1725
        ++j;
        ++k;
1726
        continue;
C
Cary Xu 已提交
1727
      }
1728 1729 1730 1731 1732
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
dengyihao's avatar
dengyihao 已提交
1733
      if (colIdOfRow2 < pColInfo->info.colId) {
C
Cary Xu 已提交
1734
        ++k;
1735 1736 1737 1738 1739 1740 1741
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
C
Cary Xu 已提交
1742

dengyihao's avatar
dengyihao 已提交
1743
    if (isChosenRowDataRow) {
1744 1745
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
C
Cary Xu 已提交
1746 1747
      // TODO: use STSRowIter
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
C
Cary Xu 已提交
1748 1749 1750
      if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1751
          mergeOption = 1;
C
Cary Xu 已提交
1752 1753 1754
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1755
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1756
          ++nResult;
C
Cary Xu 已提交
1757
        } else if (update) {
C
Cary Xu 已提交
1758 1759 1760 1761
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1762 1763
        }
      }
1764
    } else {
C
Cary Xu 已提交
1765 1766 1767 1768
      // TODO: use STSRowIter
      if (chosen_itr == 0) {
        colId = PRIMARYKEY_TIMESTAMP_COL_ID;
        tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
C
Cary Xu 已提交
1769 1770
        rowKey = *(TSKEY*)sVal.val;
        if (rowKey != *lastRowKey) {
C
Cary Xu 已提交
1771
          mergeOption = 1;
C
Cary Xu 已提交
1772 1773 1774
          if (*lastRowKey != TSKEY_INITIAL_VAL) {
            ++(*curRow);
          }
1775
          *lastRowKey = rowKey;
C
Cary Xu 已提交
1776
          ++nResult;
C
Cary Xu 已提交
1777
        } else if (update) {
C
Cary Xu 已提交
1778 1779 1780 1781
          mergeOption = 2;
        } else {
          mergeOption = 0;
          break;
C
Cary Xu 已提交
1782
        }
C
Cary Xu 已提交
1783 1784 1785 1786 1787 1788
      } else {
        SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
        colId = pColIdx->colId;
        offset = pColIdx->offset;
        tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
      }
1789 1790
    }

C
Cary Xu 已提交
1791 1792
    ASSERT(rowKey != TSKEY_INITIAL_VAL);

1793
    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1794
      if (tdValTypeIsNorm(sVal.valType)) {
C
Cary Xu 已提交
1795 1796 1797 1798 1799
        colDataAppend(pColInfo, *curRow, sVal.val, false);
      } else if (tdValTypeIsNull(sVal.valType)) {
        colDataAppend(pColInfo, *curRow, NULL, true);
      } else if (tdValTypeIsNone(sVal.valType)) {
        // TODO: Set null if nothing append for this row
C
Cary Xu 已提交
1800
        if (mergeOption == 1) {
C
Cary Xu 已提交
1801 1802 1803 1804
          colDataAppend(pColInfo, *curRow, NULL, true);
        }
      } else {
        ASSERT(0);
1805
      }
H
Haojun Liao 已提交
1806

C
Cary Xu 已提交
1807
      ++i;
C
Cary Xu 已提交
1808

dengyihao's avatar
dengyihao 已提交
1809
      if (row == row1) {
C
Cary Xu 已提交
1810
        ++j;
1811
      } else {
C
Cary Xu 已提交
1812
        ++k;
1813 1814
      }
    } else {
C
Cary Xu 已提交
1815
      if (mergeOption == 1) {
C
Cary Xu 已提交
1816
        colDataAppend(pColInfo, *curRow, NULL, true);
C
Cary Xu 已提交
1817
      }
C
Cary Xu 已提交
1818
      ++i;
1819
    }
1820
  }
1821

C
Cary Xu 已提交
1822
  if (mergeOption == 1) {
dengyihao's avatar
dengyihao 已提交
1823
    while (i < numOfCols) {  // the remain columns are all null data
1824
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
C
Cary Xu 已提交
1825 1826
      colDataAppend(pColInfo, *curRow, NULL, true);
      ++i;
1827 1828
    }
  }
C
Cary Xu 已提交
1829 1830

  return nResult;
H
Haojun Liao 已提交
1831
#endif
1832
}
1833

dengyihao's avatar
dengyihao 已提交
1834 1835
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
                                int32_t numOfExisted, int32_t* start, int32_t* end) {
1836 1837
  *start = -1;

1838
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1839
    int32_t remain = endPos - startPos + 1;
1840 1841
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1842 1843
    } else {
      *end = endPos;
1844 1845 1846 1847 1848
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1849 1850
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1851 1852
    } else {
      *end = endPos;
1853 1854 1855 1856 1857 1858 1859
    }

    *start = *end;
    *end = startPos;
  }
}

dengyihao's avatar
dengyihao 已提交
1860 1861
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
                                 int32_t endPos) {
1862
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1863 1864

  pCheckInfo->lastKey = cur->lastKey;
1865
  pTsdbReadHandle->realNumOfRows = numOfRows;
1866 1867 1868 1869
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1870 1871
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1872 1873

  if (cur->rows > 0) {
1874 1875
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1876
    } else {
1877
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1878 1879
    }

1880
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
dengyihao's avatar
dengyihao 已提交
1881 1882
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
           cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
H
Haojun Liao 已提交
1883
  } else {
1884
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1885

dengyihao's avatar
dengyihao 已提交
1886
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1887
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1888 1889 1890
  }
}

dengyihao's avatar
dengyihao 已提交
1891 1892
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                           SDataBlockInfo* pBlockInfo, int32_t endPos) {
1893
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1894

1895
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1896
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1897

1898
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
H
Haojun Liao 已提交
1899

dengyihao's avatar
dengyihao 已提交
1900
  int32_t step = ascScan ? 1 : -1;
H
Haojun Liao 已提交
1901 1902 1903 1904

  int32_t start = cur->pos;
  int32_t end = endPos;

1905
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
1906
    TSWAP(start, end);
H
Haojun Liao 已提交
1907 1908
  }

1909 1910
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1911 1912

  // the time window should always be ascending order: skey <= ekey
dengyihao's avatar
dengyihao 已提交
1913
  cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1914
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
dengyihao's avatar
dengyihao 已提交
1915 1916
  cur->lastKey = tsArray[endPos] + step;
  cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
H
Haojun Liao 已提交
1917

H
Haojun Liao 已提交
1918
  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
1919
  int32_t pos = endPos + step;
1920 1921
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1922

dengyihao's avatar
dengyihao 已提交
1923 1924 1925
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1926 1927
}

1928
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1929 1930
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
1931
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
dengyihao's avatar
dengyihao 已提交
1932
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1933

1934
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1935
  SDataCols*     pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1936

1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948
  if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
      endPos = pBlockInfo->rows - 1;
      cur->mixBlock = (cur->pos != 0);
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
      endPos = 0;
      cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
    } else {
      assert(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
      cur->mixBlock = true;
    }
H
Haojun Liao 已提交
1949
  } else {
1950
    if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
1951
      endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
1952
    } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
1953
      endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
    } else {
      ASSERT(pCols->numOfRows > 0);
      endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);

      // current data is more than the capacity
      int32_t size = abs(cur->pos - endPos) + 1;
      if (size > pTsdbReadHandle->outputCapacity) {
        int32_t delta = size - pTsdbReadHandle->outputCapacity;
        if (ascScan) {
          endPos -= delta;
        } else {
          endPos += delta;
        }
      }
    }
H
Haojun Liao 已提交
1969 1970 1971 1972 1973 1974
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1975 1976
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1977 1978
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1979
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
H
Hongze Cheng 已提交
1980
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1981

1982
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1983

1984 1985
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
dengyihao's avatar
dengyihao 已提交
1986
         cur->pos >= 0 && cur->pos < pBlock->numOfRows);
1987
  // Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
1988
  TSKEY* tsArray = pCols->cols[0].pData;
H
Hongze Cheng 已提交
1989 1990
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->minKey.ts &&
         tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
1991

dengyihao's avatar
dengyihao 已提交
1992
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1993 1994
  int32_t step = ascScan ? 1 : -1;

1995
  // for search the endPos, so the order needs to reverse
1996
  int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
1997

1998 1999
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
2000

2001
  STimeWindow* pWin = &blockInfo.window;
H
Hongze Cheng 已提交
2002
  tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
dengyihao's avatar
dengyihao 已提交
2003 2004 2005
            " rows:%d, start:%d, end:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2006

2007 2008
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
C
Cary Xu 已提交
2009
  int32_t curRow = 0;
H
Haojun Liao 已提交
2010

dengyihao's avatar
dengyihao 已提交
2011 2012
  int16_t   rv1 = -1;
  int16_t   rv2 = -1;
2013 2014
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
2015

H
Haojun Liao 已提交
2016 2017
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
2018
  bool adjustPos = false;
2019

2020 2021
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
2022
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
2023
    return;
2024
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
2025
    SSkipListNode* node = NULL;
C
Cary Xu 已提交
2026
    TSKEY          lastKeyAppend = TSKEY_INITIAL_VAL;
C
Cary Xu 已提交
2027

2028
    do {
H
Haojun Liao 已提交
2029
      STSRow* row2 = NULL;
C
Cary Xu 已提交
2030
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
2031
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
2032
        break;
2033
      }
2034

H
Haojun Liao 已提交
2035
      TSKEY key = TD_ROW_KEY(row1);
2036
      if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
2037 2038 2039
        break;
      }

2040 2041 2042 2043 2044 2045 2046
      if (adjustPos) {
        if (key == lastKeyAppend) {
          pos -= step;
        }
        adjustPos = false;
      }

2047 2048
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
2049 2050 2051
        break;
      }

2052
      if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
H
Haojun Liao 已提交
2053
        if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2054
          //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2055
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
2056
        }
dengyihao's avatar
dengyihao 已提交
2057 2058
        if (row2 && rv2 != TD_ROW_SVER(row2)) {
          //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2059
          rv2 = TD_ROW_SVER(row2);
2060
        }
dengyihao's avatar
dengyihao 已提交
2061

C
Cary Xu 已提交
2062 2063 2064
        numOfRows +=
            mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                               pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
2065 2066 2067
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
2068

2069
        cur->win.ekey = key;
dengyihao's avatar
dengyihao 已提交
2070
        cur->lastKey = key + step;
2071
        cur->mixBlock = true;
2072
        moveToNextRowInMem(pCheckInfo);
2073
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
C
Cary Xu 已提交
2074
#if 0
H
TD-1439  
Hongze Cheng 已提交
2075
        if (pCfg->update) {
dengyihao's avatar
dengyihao 已提交
2076
          if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
2077
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
2078
          }
H
Haojun Liao 已提交
2079
          if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
2080
            //            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
2081
            rv1 = TD_ROW_SVER(row1);
2082
          }
dengyihao's avatar
dengyihao 已提交
2083 2084
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            //            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
2085
            rv2 = TD_ROW_SVER(row2);
2086
          }
dengyihao's avatar
dengyihao 已提交
2087

2088
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
dengyihao's avatar
dengyihao 已提交
2089
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
C
Cary Xu 已提交
2090
                             pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull, &lastRowKey);
H
TD-1439  
Hongze Cheng 已提交
2091 2092 2093 2094 2095 2096 2097 2098 2099
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

C
Cary Xu 已提交
2100 2101 2102 2103 2104 2105 2106
          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
#endif
        if (TD_SUPPORT_UPDATE(pCfg->update)) {
2107 2108 2109 2110
          if (lastKeyAppend != key) {
            if (lastKeyAppend != TSKEY_INITIAL_VAL) {
              ++curRow;
            }
2111
            lastKeyAppend = key;
2112 2113
          }
          // load data from file firstly
C
Cary Xu 已提交
2114
          numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
C
Cary Xu 已提交
2115 2116 2117 2118 2119 2120 2121

          if (rv1 != TD_ROW_SVER(row1)) {
            rv1 = TD_ROW_SVER(row1);
          }
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            rv2 = TD_ROW_SVER(row2);
          }
2122 2123

          // still assign data into current row
dengyihao's avatar
dengyihao 已提交
2124 2125 2126
          numOfRows +=
              mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
                                 pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
C
Cary Xu 已提交
2127 2128 2129 2130 2131 2132 2133 2134 2135

          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

H
TD-1439  
Hongze Cheng 已提交
2136
          moveToNextRowInMem(pCheckInfo);
2137

H
TD-1439  
Hongze Cheng 已提交
2138
          pos += step;
2139
          adjustPos = true;
H
TD-1439  
Hongze Cheng 已提交
2140
        } else {
2141
          // discard the memory record
H
TD-1439  
Hongze Cheng 已提交
2142 2143
          moveToNextRowInMem(pCheckInfo);
        }
2144
      } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
2145 2146 2147
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
2148

2149
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
2150 2151
        assert(end != -1);

dengyihao's avatar
dengyihao 已提交
2152
        if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
C
Cary Xu 已提交
2153
#if 0
2154
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
2155 2156 2157 2158
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
C
Cary Xu 已提交
2159 2160 2161 2162 2163 2164
#endif
          if (!TD_SUPPORT_UPDATE(pCfg->update)) {
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
2165
        }
2166

2167
        int32_t qstart = 0, qend = 0;
2168
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
2169

2170
        if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
C
Cary Xu 已提交
2171 2172
          ++curRow;
        }
2173

C
Cary Xu 已提交
2174
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
2175
        pos += (qend - qstart + 1) * step;
C
Cary Xu 已提交
2176 2177 2178
        if (numOfRows > 0) {
          curRow = numOfRows - 1;
        }
2179

2180
        cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
dengyihao's avatar
dengyihao 已提交
2181
        cur->lastKey = cur->win.ekey + step;
C
Cary Xu 已提交
2182
        lastKeyAppend = cur->win.ekey;
2183
      }
2184
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
2185

2186
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
2187 2188 2189 2190
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
dengyihao's avatar
dengyihao 已提交
2191
      if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
2192
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
2193 2194 2195 2196 2197
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

2198
        int32_t start = -1, end = -1;
2199
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
2200

2201
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
2202
        pos += (end - start + 1) * step;
2203

2204
        cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
dengyihao's avatar
dengyihao 已提交
2205
        cur->lastKey = cur->win.ekey + step;
H
Haojun Liao 已提交
2206
        cur->mixBlock = true;
2207
      }
2208 2209
    }
  }
H
Haojun Liao 已提交
2210

2211
  cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
dengyihao's avatar
dengyihao 已提交
2212
                         ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
2213

2214
  if (!ascScan) {
wafwerar's avatar
wafwerar 已提交
2215
    TSWAP(cur->win.skey, cur->win.ekey);
2216
  }
2217

2218 2219
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2220

dengyihao's avatar
dengyihao 已提交
2221 2222 2223
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
2224 2225
}

2226
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2227
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2228
  int    numOfRows;
2229 2230
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2231
  if (num <= 0) return -1;
2232 2233

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2234 2235
  firstPos = 0;
  lastPos = num - 1;
2236

2237
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2238 2239 2240 2241 2242
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2243

H
Haojun Liao 已提交
2244 2245
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2246

H
[td-32]  
hjxilinx 已提交
2247 2248 2249 2250 2251 2252 2253 2254
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2255

H
[td-32]  
hjxilinx 已提交
2256 2257 2258 2259 2260
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2261

H
[td-32]  
hjxilinx 已提交
2262 2263 2264 2265 2266 2267 2268
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2269

H
Haojun Liao 已提交
2270 2271
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2272

H
[td-32]  
hjxilinx 已提交
2273 2274 2275 2276 2277 2278 2279 2280 2281
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2282

H
[td-32]  
hjxilinx 已提交
2283 2284 2285
  return midPos;
}

2286
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
2287 2288
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
2289 2290

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2291
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
2292
    taosMemoryFreeClear(pBlockInfo);
2293 2294
  }

wafwerar's avatar
wafwerar 已提交
2295
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2307
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2308 2309
    /* left block is empty */
    return 1;
2310
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2311 2312 2313 2314 2315 2316 2317
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2318
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
dengyihao's avatar
dengyihao 已提交
2319
#if 0  // TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2320 2321
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2322
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2323
  }
H
Haojun Liao 已提交
2324
#endif
2325

H
Haojun Liao 已提交
2326
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2327 2328
}

2329
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2330 2331
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2332 2333
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2334
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2335 2336 2337 2338
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

dengyihao's avatar
dengyihao 已提交
2339
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
2340 2341
  }

2342
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2343 2344
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2345
  // access data blocks according to the offset of each block in asc/desc order.
2346
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2347

2348 2349
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2350 2351 2352
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2353

2354
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2355
    cleanBlockOrderSupporter(&sup, 0);
2356
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2357
  }
H
Haojun Liao 已提交
2358

2359
  int32_t cnt = 0;
2360
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2361

2362
  for (int32_t j = 0; j < numOfTables; ++j) {
2363
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2364 2365 2366
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2367

H
refact  
Hongze Cheng 已提交
2368
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2369
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2370

wafwerar's avatar
wafwerar 已提交
2371
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2372
    if (buf == NULL) {
2373
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2374
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2375 2376
    }

2377
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2378 2379

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2380
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2381

H
Haojun Liao 已提交
2382 2383
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2384 2385 2386
      cnt++;
    }

2387
    numOfQualTables++;
2388 2389
  }

H
Haojun Liao 已提交
2390
  assert(numOfBlocks == cnt);
2391

H
Haojun Liao 已提交
2392 2393
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2394
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2395
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2396

H
Haojun Liao 已提交
2397
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2398
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2399 2400
    return TSDB_CODE_SUCCESS;
  }
2401

H
Haojun Liao 已提交
2402
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2403
            numOfQualTables, pTsdbReadHandle->idStr);
2404

2405
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2406
  sup.numOfTables = numOfQualTables;
2407

2408
  SMultiwayMergeTreeInfo* pTree = NULL;
dengyihao's avatar
dengyihao 已提交
2409
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2410 2411
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2412
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2413 2414 2415 2416 2417
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2418
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2419 2420
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2421
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2422
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2423 2424

    // set data block index overflow, in order to disable the offset comparator
2425 2426
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2427
    }
2428

H
Haojun Liao 已提交
2429
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2430 2431 2432 2433 2434
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2435
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2436 2437 2438
   * }
   */

H
Haojun Liao 已提交
2439
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2440
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2441
  taosMemoryFree(pTree);
2442 2443 2444 2445

  return TSDB_CODE_SUCCESS;
}

2446
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2447

2448
static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
dengyihao's avatar
dengyihao 已提交
2449
  int32_t        step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2450
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2451

dengyihao's avatar
dengyihao 已提交
2452
  while (1) {
2453
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2454 2455 2456 2457
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2458 2459
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2460
      // all data blocks in current file has been checked already, try next file if exists
2461
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2462 2463 2464 2465
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2466
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2467 2468 2469 2470
    }
  }
}

2471 2472 2473
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2474 2475 2476

  int32_t code = TSDB_CODE_SUCCESS;

2477
  int32_t numOfBlocks = 0;
2478
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2479

C
Cary Xu 已提交
2480
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
C
Cary Xu 已提交
2481
  STimeWindow   win = TSWINDOW_INITIALIZER;
2482

H
Hongze Cheng 已提交
2483
  while (true) {
2484
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2485

2486 2487
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2488 2489 2490
      break;
    }

H
refact  
Hongze Cheng 已提交
2491
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2492 2493

    // current file are not overlapped with query time window, ignore remain files
2494 2495 2496
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2497 2498
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2499 2500
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2501 2502 2503
      break;
    }

2504 2505
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2506 2507 2508 2509
      code = terrno;
      break;
    }

2510
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2511

2512
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2513 2514 2515 2516
      code = terrno;
      break;
    }

2517
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2518 2519
      break;
    }
H
Haojun Liao 已提交
2520

H
Haojun Liao 已提交
2521 2522
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2523

2524 2525 2526 2527
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2528

2529
    // todo return error code to query engine
dengyihao's avatar
dengyihao 已提交
2530 2531
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
        TSDB_CODE_SUCCESS) {
2532 2533
      break;
    }
H
Haojun Liao 已提交
2534

2535 2536
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2537 2538 2539
      break;
    }
  }
H
Haojun Liao 已提交
2540

2541
  // no data in file anymore
2542
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2543
    if (code == TSDB_CODE_SUCCESS) {
2544
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2545 2546
    }

D
dapan1121 已提交
2547
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2548 2549
    *exists = false;
    return code;
2550
  }
H
Haojun Liao 已提交
2551

2552
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
dengyihao's avatar
dengyihao 已提交
2553
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
2554
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2555

2556
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2557
  return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2558 2559 2560 2561 2562 2563 2564
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2565
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2566
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
2567

2568 2569
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2570 2571

  cur->slot += step;
dengyihao's avatar
dengyihao 已提交
2572
  cur->mixBlock = false;
H
Haojun Liao 已提交
2573
  cur->blockCompleted = false;
2574
}
H
Haojun Liao 已提交
2575 2576

int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
dengyihao's avatar
dengyihao 已提交
2577
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
H
Haojun Liao 已提交
2578

H
Haojun Liao 已提交
2579
  pTableBlockInfo->totalSize = 0;
2580
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2581

2582
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2583 2584

  // find the start data block in file
2585
  pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2586 2587
  STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
  int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2588 2589

  tsdbRLockFS(pFileHandle);
2590 2591
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2592 2593
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2594
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2595

H
Haojun Liao 已提交
2596
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2597
  int32_t     numOfBlocks = 0;
2598
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2599
  int         defaultRows = 4096;  // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2600 2601
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Haojun Liao 已提交
2602 2603
  bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

H
Haojun Liao 已提交
2604 2605
  while (true) {
    numOfBlocks = 0;
2606
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2607

2608 2609
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2610 2611 2612
      break;
    }

H
refact  
Hongze Cheng 已提交
2613
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2614 2615

    // current file are not overlapped with query time window, ignore remain files
dengyihao's avatar
dengyihao 已提交
2616 2617
    if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
2618
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2619 2620
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2621
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2622 2623 2624
      break;
    }

H
Haojun Liao 已提交
2625
    pTableBlockInfo->numOfFiles += 1;
2626 2627
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2628 2629 2630 2631
      code = terrno;
      break;
    }

2632
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2633

2634
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2635 2636 2637 2638
      code = terrno;
      break;
    }

2639
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2640 2641 2642
      break;
    }

H
Haojun Liao 已提交
2643 2644
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2645 2646 2647 2648 2649 2650

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2651
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2652 2653 2654

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2655
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2656

H
Haojun Liao 已提交
2657
        int32_t numOfRows = pBlock[j].numOfRows;
2658
        pTableBlockInfo->totalRows += numOfRows;
H
Haojun Liao 已提交
2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
dengyihao's avatar
dengyihao 已提交
2670 2671 2672
        //        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
        //        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
        //        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2673 2674 2675 2676 2677 2678 2679
      }
    }
  }

  return code;
}

2680 2681 2682
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2683 2684

  // find the start data block in file
2685 2686
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
C
Cary Xu 已提交
2687 2688
    STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
    int32_t       fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2689

H
Hongze Cheng 已提交
2690
    tsdbRLockFS(pFileHandle);
2691 2692
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2693
    tsdbUnLockFS(pFileHandle);
2694

2695
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2696
  } else {
2697
    // check if current file block is all consumed
2698
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2699
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2700

2701
    // current block is done, try next
H
Haojun Liao 已提交
2702
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2703
      // all data blocks in current file has been checked already, try next file if exists
2704
    } else {
H
Haojun Liao 已提交
2705 2706
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2707 2708
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2709

H
Haojun Liao 已提交
2710 2711 2712 2713 2714 2715 2716
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2717 2718
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2719
    } else {
2720 2721
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2722
      return getDataBlock(pTsdbReadHandle, pNext, exists);
2723 2724
    }
  }
2725 2726
}

2727 2728
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2729

2730 2731
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2732 2733
      return true;
    }
H
Haojun Liao 已提交
2734

2735
    pTsdbReadHandle->activeIndex += 1;
2736
  }
H
Haojun Liao 已提交
2737

2738 2739 2740
  return false;
}

dengyihao's avatar
dengyihao 已提交
2741
// todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2742
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2743
  // filter the queried time stamp in the first place
dengyihao's avatar
dengyihao 已提交
2744
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
2745 2746

  // starts from the buffer in case of descending timestamp order check data blocks
2747
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2748 2749

  int32_t i = 0;
dengyihao's avatar
dengyihao 已提交
2750
  while (i < numOfTables) {
2751
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2752 2753

    // the first qualified table for interpolation query
dengyihao's avatar
dengyihao 已提交
2754 2755 2756 2757
    //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
    //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
    //      break;
    //    }
H
Haojun Liao 已提交
2758 2759 2760 2761 2762 2763 2764 2765 2766

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2767
  STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
2768
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2769

2770 2771
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2772 2773 2774
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2775
                                 STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2776
  int       numOfRows = 0;
C
Cary Xu 已提交
2777
  int       curRows = 0;
dengyihao's avatar
dengyihao 已提交
2778
  int32_t   numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Hongze Cheng 已提交
2779
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2780 2781
  win->skey = TSKEY_INITIAL_VAL;

dengyihao's avatar
dengyihao 已提交
2782 2783
  int64_t   st = taosGetTimestampUs();
  int16_t   rv = -1;
D
fix bug  
dapan1121 已提交
2784
  STSchema* pSchema = NULL;
C
Cary Xu 已提交
2785 2786
  TSKEY     lastRowKey = TSKEY_INITIAL_VAL;

H
Haojun Liao 已提交
2787
  do {
C
Cary Xu 已提交
2788
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
H
Haojun Liao 已提交
2789 2790 2791 2792
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2793
    TSKEY key = TD_ROW_KEY(row);
dengyihao's avatar
dengyihao 已提交
2794 2795 2796 2797
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
                key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2798 2799 2800 2801 2802 2803 2804 2805 2806

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2807
    if (rv != TD_ROW_SVER(row)) {
2808
      pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, TD_ROW_SVER(row));
H
Haojun Liao 已提交
2809
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2810
    }
C
Cary Xu 已提交
2811 2812
    numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
                                    pSchema, NULL, pCfg->update, &lastRowKey);
H
Haojun Liao 已提交
2813

C
Cary Xu 已提交
2814
    if (numOfRows >= maxRowsToRead) {
H
Haojun Liao 已提交
2815 2816 2817 2818
      moveToNextRowInMem(pCheckInfo);
      break;
    }

dengyihao's avatar
dengyihao 已提交
2819
  } while (moveToNextRowInMem(pCheckInfo));
H
Haojun Liao 已提交
2820

C
Cary Xu 已提交
2821
  taosMemoryFreeClear(pSchema);  // free the STSChema
H
Haojun Liao 已提交
2822 2823 2824
  assert(numOfRows <= maxRowsToRead);

  int64_t elapsedTime = taosGetTimestampUs() - st;
dengyihao's avatar
dengyihao 已提交
2825 2826
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
            pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2827 2828 2829 2830

  return numOfRows;
}

dengyihao's avatar
dengyihao 已提交
2831 2832 2833 2834 2835 2836
void* tsdbGetIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIdx(pMeta);
}
dengyihao's avatar
dengyihao 已提交
2837 2838 2839 2840 2841 2842
void* tsdbGetIvtIdx(SMeta* pMeta) {
  if (pMeta == NULL) {
    return NULL;
  }
  return metaGetIvtIdx(pMeta);
}
wmmhello's avatar
wmmhello 已提交
2843
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
2844
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2845

2846 2847 2848 2849 2850
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2851

2852
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2853 2854 2855
    taosArrayPush(list, &info);
  }

C
Cary Xu 已提交
2856
  metaCloseCtbCursor(pCur);
H
Haojun Liao 已提交
2857 2858 2859
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875
int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);

  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

    taosArrayPush(list, &id);
  }

  metaCloseCtbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2876 2877 2878 2879 2880
static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2881 2882 2883 2884 2885 2886
  //  tQueryInfo* pInfo = (tQueryInfo*)param;
  //  if (pInfo->optr != TSDB_RELATION_IN) {
  //    taosMemoryFreeClear(pInfo->q);
  //  } else {
  //    taosHashCleanup((SHashObj *)(pInfo->q));
  //  }
H
Haojun Liao 已提交
2887

wafwerar's avatar
wafwerar 已提交
2888
  taosMemoryFree(param);
H
Haojun Liao 已提交
2889 2890
}

dengyihao's avatar
dengyihao 已提交
2891 2892
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
2893

dengyihao's avatar
dengyihao 已提交
2894
static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
2895
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2896 2897
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2898

2899
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2900
    if (code != TSDB_CODE_SUCCESS) {
2901
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2902 2903
      return false;
    }
H
Haojun Liao 已提交
2904

H
Haojun Liao 已提交
2905
    if (exists) {
dengyihao's avatar
dengyihao 已提交
2906
      tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
2907 2908 2909
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2910 2911
      }

dengyihao's avatar
dengyihao 已提交
2912
      pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2913 2914
      return exists;
    }
H
Haojun Liao 已提交
2915

2916
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2917
  }
H
Haojun Liao 已提交
2918

2919 2920
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2921 2922
    return true;
  }
H
Haojun Liao 已提交
2923

H
Haojun Liao 已提交
2924
  // current result is empty
dengyihao's avatar
dengyihao 已提交
2925 2926 2927
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
      pTsdbReadHandle->cur.rows == 0) {
    //    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2928

dengyihao's avatar
dengyihao 已提交
2929 2930
    //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
    //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2931

2932
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2933

dengyihao's avatar
dengyihao 已提交
2934 2935
    //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
    //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
2936
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2937 2938

    return result;
2939
  }
H
Haojun Liao 已提交
2940

H
Haojun Liao 已提交
2941 2942
  return false;
}
2943

2944
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2945
  // the last row is cached in buffer, return it directly.
2946
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
dengyihao's avatar
dengyihao 已提交
2947
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
2948
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2949 2950
  assert(numOfTables > 0 && numOfCols > 0);

2951
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2952

dengyihao's avatar
dengyihao 已提交
2953 2954 2955
  STSRow* pRow = NULL;
  TSKEY   key = TSKEY_INITIAL_VAL;
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
C
Cary Xu 已提交
2956 2957
  TSKEY   lastRowKey = TSKEY_INITIAL_VAL;
  int32_t curRow = 0;
2958 2959 2960

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
dengyihao's avatar
dengyihao 已提交
2961 2962 2963 2964
    //    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
    //    if (ret != TSDB_CODE_SUCCESS) {
    //      return false;
    //    }
C
Cary Xu 已提交
2965 2966
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
                       pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
wafwerar's avatar
wafwerar 已提交
2967
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2968

H
Haojun Liao 已提交
2969 2970 2971
    // update the last key value
    pCheckInfo->lastKey = key + step;

dengyihao's avatar
dengyihao 已提交
2972 2973
    cur->rows = 1;  // only one row
    cur->lastKey = key + step;
H
Haojun Liao 已提交
2974 2975 2976 2977 2978
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2979
  }
H
Haojun Liao 已提交
2980

H
Haojun Liao 已提交
2981 2982 2983
  return false;
}

dengyihao's avatar
dengyihao 已提交
2984
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
dengyihao's avatar
dengyihao 已提交
3004 3005
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
//      pTable->tableId); continue;
3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135
//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
3136 3137 3138
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
3139

dengyihao's avatar
dengyihao 已提交
3140
  while (pTsdbReadHandle->activeIndex < numOfTables) {
3141
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
3142 3143 3144
      return true;
    }

3145
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
3146 3147
    pCheckInfo->numOfBlocks = 0;

3148 3149
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
3150 3151
    pTsdbReadHandle->checkFiles = true;
    pTsdbReadHandle->cur.rows = 0;
3152
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
3153 3154 3155 3156

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
3157
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3158 3159 3160
  }

  return false;
3161 3162
}

H
Haojun Liao 已提交
3163
// handle data in cache situation
H
Haojun Liao 已提交
3164
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3165
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
Y
yihaoDeng 已提交
3166

3167 3168
  size_t numOfCols = taosArrayGetSize(pTsdbReadHandle->pColumns);
  for (int32_t i = 0; i < numOfCols; ++i) {
3169 3170 3171 3172
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

3173
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3174 3175
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
              pTsdbReadHandle->idStr);
3176 3177 3178
    return false;
  }

Y
yihaoDeng 已提交
3179 3180 3181
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

3182
  // TODO refactor: remove "type"
3183 3184
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
dengyihao's avatar
dengyihao 已提交
3185
      //      return loadCachedLastRow(pTsdbReadHandle);
3186
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
dengyihao's avatar
dengyihao 已提交
3187
      //      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
3188
    }
H
Haojun Liao 已提交
3189
  }
Y
yihaoDeng 已提交
3190

3191 3192
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
dengyihao's avatar
dengyihao 已提交
3193
  } else {  // loadType == RR and Offset Order
3194
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
3195 3196 3197
      // check if the query range overlaps with the file data block
      bool exists = true;

3198
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
3199
      if (code != TSDB_CODE_SUCCESS) {
3200 3201
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
3202 3203 3204 3205 3206

        return false;
      }

      if (exists) {
3207
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
3208 3209
        return exists;
      }
Y
yihaoDeng 已提交
3210

3211 3212
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
3213 3214
    }

H
Haojun Liao 已提交
3215
    // TODO: opt by consider the scan order
3216
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
3217
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
3218

H
Haojun Liao 已提交
3219
    elapsedTime = taosGetTimestampUs() - stime;
3220
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
3221
    return ret;
Y
yihaoDeng 已提交
3222 3223
  }
}
3224

dengyihao's avatar
dengyihao 已提交
3225
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
3260
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
3261 3262 3263 3264 3265 3266 3267 3268 3269
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
3270
//  SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
3271 3272 3273 3274 3275 3276 3277 3278
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
3279
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
3280 3281 3282 3283 3284 3285 3286 3287 3288 3289
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3290
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
3291
//  taosMemoryFreeClear(cond.colList);
3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
dengyihao's avatar
dengyihao 已提交
3330
// out_of_memory:
3331
//  tsdbCleanupReadHandle(pSecQueryHandle);
3332 3333 3334
//  return terrno;
//}

H
Haojun Liao 已提交
3335
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3336 3337
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
  SQueryFilePos*   cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3338

H
Haojun Liao 已提交
3339 3340
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3341
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3342 3343
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3344 3345
  }

dengyihao's avatar
dengyihao 已提交
3346
  int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3347
  for (int32_t i = 0; i < numOfCols; ++i) {
3348 3349
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3350 3351 3352

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3353
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3354
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3355 3356

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3357
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3358
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3359 3360 3361
    }
  }

H
Haojun Liao 已提交
3362 3363
  cur->rows = 2;
  return true;
3364 3365
}

3366
/*
3367
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3368
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3369
 */
H
Haojun Liao 已提交
3370
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3387
// out:
3388 3389 3390 3391
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3392
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
dengyihao's avatar
dengyihao 已提交
3393
  return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3394 3395
}

wmmhello's avatar
wmmhello 已提交
3396 3397
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) {
  assert(pTsdbReadHandle != NULL && tableList != NULL);
3398

dengyihao's avatar
dengyihao 已提交
3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422
  //  TSKEY    key = TSKEY_INITIAL_VAL;
  //
  //  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
  //  assert(group != NULL);
  //
  //  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
  //
  //  int32_t code = 0;
  //
  //  if (((STable*)pInfo->pTable)->lastRow) {
  //    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
  //    if (code != TSDB_CODE_SUCCESS) {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
  //    } else {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
  //    }
  //  }
  //
  //  // update the tsdb query time range
  //  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
  //    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
  //    pTsdbReadHandle->checkFiles  = false;
  //    pTsdbReadHandle->activeIndex = -1;  // start from -1
  //  }
H
Haojun Liao 已提交
3423

3424
  return TSDB_CODE_SUCCESS;
3425 3426
}

3427 3428
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3429 3430

  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
3431 3432 3433
  //  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
  //    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
  //  }
D
update  
dapan1121 已提交
3434 3435

  // update the tsdb query time range
3436
  if (pTsdbReadHandle->cachelastrow) {
dengyihao's avatar
dengyihao 已提交
3437
    pTsdbReadHandle->checkFiles = false;
3438
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3439 3440 3441 3442 3443
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3444
STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
H
Haojun Liao 已提交
3445
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3446

dengyihao's avatar
dengyihao 已提交
3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505
  //  int32_t totalNumOfTable = 0;
  //  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
  //
  //  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  //  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
  //  for (int32_t j = 0; j < numOfGroups; ++j) {
  //    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
  //    TSKEY   key = TSKEY_INITIAL_VAL;
  //
  //    STableKeyInfo keyInfo = {0};
  //
  //    size_t numOfTables = taosArrayGetSize(pGroup);
  //    for (int32_t i = 0; i < numOfTables; ++i) {
  //      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
  //
  //      // if the lastKey equals to INT64_MIN, there is no data in this table
  //      TSKEY lastKey = 0;  //((STable*)(pInfo->pTable))->lastKey;
  //      if (key < lastKey) {
  //        key = lastKey;
  //
  //        //        keyInfo.pTable  = pInfo->pTable;
  //        keyInfo.lastKey = key;
  //        pInfo->lastKey = key;
  //
  //        if (key < window.skey) {
  //          window.skey = key;
  //        }
  //
  //        if (key > window.ekey) {
  //          window.ekey = key;
  //        }
  //      }
  //    }
  //
  //    // more than one table in each group, only one table left for each group
  //    //    if (keyInfo.pTable != NULL) {
  //    //      totalNumOfTable++;
  //    //      if (taosArrayGetSize(pGroup) == 1) {
  //    //        // do nothing
  //    //      } else {
  //    //        taosArrayClear(pGroup);
  //    //        taosArrayPush(pGroup, &keyInfo);
  //    //      }
  //    //    } else {  // mark all the empty groups, and remove it later
  //    //      taosArrayDestroy(pGroup);
  //    //      taosArrayPush(emptyGroup, &j);
  //    //    }
  //  }
  //
  //  // window does not being updated, so set the original
  //  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
  //    window = TSWINDOW_INITIALIZER;
  //    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
  //  }
  //
  //  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
  //  taosArrayDestroy(emptyGroup);
  //
  //  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3506
  return window;
H
hjxilinx 已提交
3507 3508
}

H
Haojun Liao 已提交
3509
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3510
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
dengyihao's avatar
dengyihao 已提交
3511
  SQueryFilePos*   cur = &pHandle->cur;
3512 3513

  uint64_t uid = 0;
H
Haojun Liao 已提交
3514

3515
  // there are data in file
D
dapan1121 已提交
3516
  if (pHandle->cur.fid != INT32_MIN) {
3517
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3518
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3519
  } else {
3520
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3521
    uid = pCheckInfo->tableId;
3522
  }
3523

dengyihao's avatar
dengyihao 已提交
3524 3525
  tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
            cur->win.skey, cur->win.ekey, pHandle->idStr);
3526

3527
  pDataBlockInfo->uid = uid;
3528 3529 3530 3531 3532 3533

#if 0
  // for multi-group data query processing test purpose
  pDataBlockInfo->groupId = uid;
#endif

dengyihao's avatar
dengyihao 已提交
3534
  pDataBlockInfo->rows = cur->rows;
H
Haojun Liao 已提交
3535
  pDataBlockInfo->window = cur->win;
3536
}
H
hjxilinx 已提交
3537

H
Haojun Liao 已提交
3538 3539 3540
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3541
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
dengyihao's avatar
dengyihao 已提交
3542
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3543
  *allHave = false;
H
Haojun Liao 已提交
3544

H
Haojun Liao 已提交
3545 3546
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3547 3548 3549
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3550

H
Haojun Liao 已提交
3551 3552 3553 3554
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3555 3556 3557 3558
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3559 3560

  int64_t stime = taosGetTimestampUs();
3561 3562
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3563
    return terrno;
3564 3565 3566
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3567
  }
H
Haojun Liao 已提交
3568

S
Shengliang Guan 已提交
3569
  tsdbDebug("vgId:%d, succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
C
Cary Xu 已提交
3570 3571
            TSDB_READ_TABLE_UID(&pHandle->rhelper));

3572
  int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
3573

H
Haojun Liao 已提交
3574
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
3575 3576 3577
  memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

dengyihao's avatar
dengyihao 已提交
3578
  for (int32_t i = 0; i < numOfCols; ++i) {
3579
    pHandle->suppInfo.pstatis[i].colId = colIds[i];
3580
  }
H
Haojun Liao 已提交
3581

3582 3583
  *allHave = true;
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3584 3585

  // always load the first primary timestamp column data
3586
  SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
3587
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3588 3589

  pPrimaryColStatis->numOfNull = 0;
H
Hongze Cheng 已提交
3590 3591
  pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
  pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
3592
  pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
H
Haojun Liao 已提交
3593

dengyihao's avatar
dengyihao 已提交
3594
  // update the number of NULL data rows
3595
  int32_t* slotIds = pHandle->suppInfo.slotIds;
dengyihao's avatar
dengyihao 已提交
3596
  for (int32_t i = 1; i < numOfCols; ++i) {
3597
    ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
C
Cary Xu 已提交
3598
    if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) {
3599 3600 3601
      if (pHandle->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
        pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
      }
3602 3603

      pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
3604 3605
    } else {
      *allHave = false;
H
Haojun Liao 已提交
3606 3607
    }
  }
H
Haojun Liao 已提交
3608 3609 3610 3611

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

3612
  *pBlockStatis = pHandle->suppInfo.plist;
3613
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3614 3615
}

H
Haojun Liao 已提交
3616
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3617
  /**
H
hjxilinx 已提交
3618
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3619 3620
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3621
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3622
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3623 3624
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3625 3626
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3627

3628
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3629 3630
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3631
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3632
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3633

H
hjxilinx 已提交
3634 3635
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3636

H
Hongze Cheng 已提交
3637
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3638
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3639
        return pHandle->pColumns;
H
Haojun Liao 已提交
3640
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3641
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3642
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3643 3644
          return NULL;
        }
H
Haojun Liao 已提交
3645

H
Haojun Liao 已提交
3646
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
hjxilinx 已提交
3647 3648
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3649 3650
    }
  }
H
hjxilinx 已提交
3651
}
3652

H
Haojun Liao 已提交
3653
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3654
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3655
    return -1;
3656
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3657 3658 3659 3660 3661 3662 3663
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

3664 3665 3666 3667 3668 3669 3670 3671
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3672
    taosMemoryFreeClear(pColInfo->pData);
3673 3674 3675 3676 3677 3678
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3679 3680 3681 3682 3683 3684
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3685
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3686 3687 3688 3689 3690 3691
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

H
Haojun Liao 已提交
3692
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3693 3694
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3695 3696
    return;
  }
3697

3698
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3699

3700
  taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3701
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
3702 3703
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
3704

3705
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3706
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3707
  } else {
3708
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3709 3710
  }

3711 3712
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3713
  }
3714

3715
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3716

3717 3718
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3719

3720 3721
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3722

3723
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3724

dengyihao's avatar
dengyihao 已提交
3725 3726 3727 3728
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
            pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3729

wafwerar's avatar
wafwerar 已提交
3730
  taosMemoryFreeClear(pTsdbReadHandle);
3731
}