tsdbRead.c 143.0 KB
Newer Older
H
hjxilinx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
17

dengyihao's avatar
dengyihao 已提交
18 19
#define EXTRA_BYTES                2
#define ASCENDING_TRAVERSE(o)      (o == TSDB_ORDER_ASC)
20
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
H
hjxilinx 已提交
21

H
Haojun Liao 已提交
22 23 24 25
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block)                                   \
  ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
                    .numOfCols = (_block)->numOfCols,                                  \
                    .rows = (_block)->numOfRows,                                       \
26
                    .uid = (_checkInfo)->tableId})
H
Haojun Liao 已提交
27

H
hjxilinx 已提交
28
enum {
dengyihao's avatar
dengyihao 已提交
29 30
  TSDB_QUERY_TYPE_ALL = 1,
  TSDB_QUERY_TYPE_LAST = 2,
H
hjxilinx 已提交
31 32
};

33
enum {
dengyihao's avatar
dengyihao 已提交
34
  TSDB_CACHED_TYPE_NONE = 0,
35
  TSDB_CACHED_TYPE_LASTROW = 1,
dengyihao's avatar
dengyihao 已提交
36
  TSDB_CACHED_TYPE_LAST = 2,
37 38
};

39
typedef struct SQueryFilePos {
dengyihao's avatar
dengyihao 已提交
40 41 42 43 44 45 46
  int32_t     fid;
  int32_t     slot;
  int32_t     pos;
  int64_t     lastKey;
  int32_t     rows;
  bool        mixBlock;
  bool        blockCompleted;
47
  STimeWindow win;
48
} SQueryFilePos;
H
hjxilinx 已提交
49

50
typedef struct SDataBlockLoadInfo {
dengyihao's avatar
dengyihao 已提交
51 52 53 54
  SDFileSet* fileGroup;
  int32_t    slot;
  uint64_t   uid;
  SArray*    pLoadedCols;
55
} SDataBlockLoadInfo;
H
hjxilinx 已提交
56

57
typedef struct SLoadCompBlockInfo {
H
hjLiao 已提交
58
  int32_t tid; /* table tid */
59 60
  int32_t fileId;
} SLoadCompBlockInfo;
H
hjxilinx 已提交
61

62
enum {
dengyihao's avatar
dengyihao 已提交
63
  CHECKINFO_CHOSEN_MEM = 0,
64
  CHECKINFO_CHOSEN_IMEM = 1,
dengyihao's avatar
dengyihao 已提交
65
  CHECKINFO_CHOSEN_BOTH = 2  // for update=2(merge case)
66 67
};

68
typedef struct STableCheckInfo {
dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77
  uint64_t           tableId;
  TSKEY              lastKey;
  SBlockInfo*        pCompInfo;
  int32_t            compSize;
  int32_t            numOfBlocks : 29;  // number of qualified data blocks not the original blocks
  uint8_t            chosen : 2;        // indicate which iterator should move forward
  bool               initBuf : 1;       // whether to initialize the in-memory skip list iterator or not
  SSkipListIterator* iter;              // mem buffer skip list iterator
  SSkipListIterator* iiter;             // imem buffer skip list iterator
78
} STableCheckInfo;
79

80
typedef struct STableBlockInfo {
dengyihao's avatar
dengyihao 已提交
81 82
  SBlock*          compBlock;
  STableCheckInfo* pTableCheckInfo;
83
} STableBlockInfo;
84

85
typedef struct SBlockOrderSupporter {
dengyihao's avatar
dengyihao 已提交
86 87 88 89
  int32_t           numOfTables;
  STableBlockInfo** pDataBlockInfo;
  int32_t*          blockIndexArray;
  int32_t*          numOfBlocksPerTable;
90 91
} SBlockOrderSupporter;

H
Haojun Liao 已提交
92 93 94
typedef struct SIOCostSummary {
  int64_t blockLoadTime;
  int64_t statisInfoLoadTime;
H
Haojun Liao 已提交
95
  int64_t checkForNextTime;
96 97
  int64_t headFileLoad;
  int64_t headFileLoadTime;
H
Haojun Liao 已提交
98 99
} SIOCostSummary;

100
typedef struct SBlockLoadSuppInfo {
C
Cary Xu 已提交
101 102 103 104
  SColumnDataAgg*  pstatis;
  SColumnDataAgg** plist;
  SArray*          defaultLoadColumn;  // default load column
  int32_t*         slotIds;            // colId to slotId
105 106
} SBlockLoadSuppInfo;

107
typedef struct STsdbReadHandle {
C
Cary Xu 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  STsdb*        pTsdb;
  SQueryFilePos cur;  // current position
  int16_t       order;
  STimeWindow   window;  // the primary query time window that applies to all queries
  //  SColumnDataAgg* statis;  // query level statistics, only one table block statistics info exists at any time
  //  SColumnDataAgg** pstatis;// the ptr array list to return to caller
  int32_t numOfBlocks;
  SArray* pColumns;  // column list, SColumnInfoData array list
  bool    locateStart;
  int32_t outputCapacity;
  int32_t realNumOfRows;
  SArray* pTableCheckInfo;  // SArray<STableCheckInfo>
  int32_t activeIndex;
  bool    checkFiles;               // check file stage
  int8_t  cachelastrow;             // check if last row cached
  bool    loadExternalRow;          // load time window external data rows
  bool    currentLoadExternalRows;  // current load external rows
  int32_t loadType;                 // block load type
  char*   idStr;                    // query info handle, for debug purpose
dengyihao's avatar
dengyihao 已提交
127 128 129 130 131
  int32_t type;  // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
  SDFileSet*         pFileGroup;
  SFSIter            fileIter;
  SReadH             rhelper;
  STableBlockInfo*   pDataBlockInfo;
C
Cary Xu 已提交
132 133 134 135
  SDataCols*         pDataCols;         // in order to hold current file data block
  int32_t            allocSize;         // allocated data block size
  SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
  SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
136
  SBlockLoadSuppInfo suppInfo;
C
Cary Xu 已提交
137 138 139 140
  SArray*            prev;  // previous row which is before than time window
  SArray*            next;  // next row which is after the query time window
  SIOCostSummary     cost;
  STSchema*          pSchema;
141
} STsdbReadHandle;
142

H
Haojun Liao 已提交
143 144 145
typedef struct STableGroupSupporter {
  int32_t    numOfCols;
  SColIndex* pCols;
146
  SSchema*   pTagSchema;
H
Haojun Liao 已提交
147 148
} STableGroupSupporter;

dengyihao's avatar
dengyihao 已提交
149 150 151 152 153
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo);

static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList);
static int32_t     checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList);
static int32_t     checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
H
Haojun Liao 已提交
154
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156
static void    changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
157
static void    doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
158
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
dengyihao's avatar
dengyihao 已提交
159 160
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
                                     STsdbReadHandle* pTsdbReadHandle);
161
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
dengyihao's avatar
dengyihao 已提交
162 163 164 165
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void*   doFreeColumnInfoData(SArray* pColumnInfoData);
// static void*   destroyTableCheckInfo(SArray* pTableCheckInfo);
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
Y
TD-1733  
yihaoDeng 已提交
166

C
Cary Xu 已提交
167 168
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions);

169
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
H
hjxilinx 已提交
170
  pBlockLoadInfo->slot = -1;
dengyihao's avatar
dengyihao 已提交
171
  pBlockLoadInfo->uid = 0;
H
hjxilinx 已提交
172
  pBlockLoadInfo->fileGroup = NULL;
H
hjxilinx 已提交
173 174
}

175
static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
H
hjLiao 已提交
176
  pCompBlockLoadInfo->tid = -1;
177 178
  pCompBlockLoadInfo->fileId = -1;
}
H
hjxilinx 已提交
179

180 181
static SArray* getColumnIdList(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfCols = QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
182 183 184 185
  assert(numOfCols <= TSDB_MAX_COLUMNS);

  SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
  for (int32_t i = 0; i < numOfCols; ++i) {
186
    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
187 188 189 190 191 192
    taosArrayPush(pIdList, &pCol->info.colId);
  }

  return pIdList;
}

193 194
static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool loadTS) {
  SArray* pLocalIdList = getColumnIdList(pTsdbReadHandle);
H
Haojun Liao 已提交
195 196 197 198 199

  // check if the primary time stamp column needs to load
  int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);

  // the primary timestamp column does not be included in the the specified load column list, add it
H
Haojun Liao 已提交
200 201
  if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
    int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
Haojun Liao 已提交
202 203 204 205 206 207
    taosArrayInsert(pLocalIdList, 0, &columnId);
  }

  return pLocalIdList;
}

H
Haojun Liao 已提交
208
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
dengyihao's avatar
dengyihao 已提交
209
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
210

dengyihao's avatar
dengyihao 已提交
211 212 213 214 215
  int64_t        rows = 0;
  STsdbMemTable* pMemTable = NULL;  // pTsdbReadHandle->pMemTable;
  if (pMemTable == NULL) {
    return rows;
  }
H
Haojun Liao 已提交
216

dengyihao's avatar
dengyihao 已提交
217 218
  //  STableData* pMem  = NULL;
  //  STableData* pIMem = NULL;
H
Haojun Liao 已提交
219

dengyihao's avatar
dengyihao 已提交
220 221
  //  SMemTable* pMemT = pMemRef->snapshot.mem;
  //  SMemTable* pIMemT = pMemRef->snapshot.imem;
H
Haojun Liao 已提交
222 223 224 225 226

  size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);

dengyihao's avatar
dengyihao 已提交
227 228 229 230 231 232 233 234
    //    if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
    //      pMem = pMemT->tData[pCheckInfo->tableId];
    //      rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
    //    }
    //    if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
    //      pIMem = pIMemT->tData[pCheckInfo->tableId];
    //      rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
    //    }
H
Haojun Liao 已提交
235 236 237
  }
  return rows;
}
238

239 240 241
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
  assert(numOfGroup >= 1);
H
Haojun Liao 已提交
242 243 244 245 246 247 248 249

  // allocate buffer in order to load data blocks from file
  SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
  if (pTableCheckInfo == NULL) {
    return NULL;
  }

  // todo apply the lastkey of table check to avoid to load header file
250
  for (int32_t i = 0; i < numOfGroup; ++i) {
dengyihao's avatar
dengyihao 已提交
251
    SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i);
H
Haojun Liao 已提交
252 253 254 255 256

    size_t gsize = taosArrayGetSize(group);
    assert(gsize > 0);

    for (int32_t j = 0; j < gsize; ++j) {
dengyihao's avatar
dengyihao 已提交
257
      STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j);
H
Haojun Liao 已提交
258

dengyihao's avatar
dengyihao 已提交
259
      STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
260 261 262
      if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
        if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
          info.lastKey = pTsdbReadHandle->window.skey;
263 264
        }

265
        assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
266
      } else {
267
        info.lastKey = pTsdbReadHandle->window.skey;
H
Haojun Liao 已提交
268 269 270
      }

      taosArrayPush(pTableCheckInfo, &info);
dengyihao's avatar
dengyihao 已提交
271 272
      tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
                info.lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
273 274 275
    }
  }

276
  // TODO  group table according to the tag value.
277
  taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
H
Haojun Liao 已提交
278 279 280
  return pTableCheckInfo;
}

281 282
static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
283 284 285 286
  assert(numOfTables >= 1);

  // todo apply the lastkey of table check to avoid to load header file
  for (int32_t i = 0; i < numOfTables; ++i) {
dengyihao's avatar
dengyihao 已提交
287
    STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
288
    pCheckInfo->lastKey = pTsdbReadHandle->window.skey;
dengyihao's avatar
dengyihao 已提交
289 290
    pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter);
    pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter);
291
    pCheckInfo->initBuf = false;
H
Haojun Liao 已提交
292

293 294
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.skey);
295
    } else {
296
      assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.skey);
297
    }
H
Haojun Liao 已提交
298 299 300
  }
}

H
Haojun Liao 已提交
301 302 303
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
  SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
D
fix bug  
dapan1121 已提交
304

dengyihao's avatar
dengyihao 已提交
305
  STableCheckInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
306

H
Haojun Liao 已提交
307 308
  info.tableId = pCheckInfo->tableId;
  taosArrayPush(pNew, &info);
H
Haojun Liao 已提交
309 310 311
  return pNew;
}

312 313
static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
314

315
  STimeWindow* w = &pTsdbReadHandle->window;
dengyihao's avatar
dengyihao 已提交
316
  bool         asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
317 318 319 320

  return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}

321 322
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
323
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
H
Hongze Cheng 已提交
324
  STsdbCfg* pCfg = REPO_CFG(pTsdb);
325 326

  int64_t now = taosGetTimestamp(pCfg->precision);
H
Hongze Cheng 已提交
327
  return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1;  // needs to add one tick
328 329
}

330
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) {
331
  pTsdbReadHandle->window = pCond->twindow;
332

333
  bool    updateTs = false;
334 335 336 337
  int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
    if (startTs > pTsdbReadHandle->window.skey) {
      pTsdbReadHandle->window.skey = startTs;
338 339
      pCond->twindow.skey = startTs;
      updateTs = true;
340 341
    }
  } else {
342 343
    if (startTs > pTsdbReadHandle->window.ekey) {
      pTsdbReadHandle->window.ekey = startTs;
344 345
      pCond->twindow.ekey = startTs;
      updateTs = true;
346 347 348
    }
  }

349
  if (updateTs) {
H
Haojun Liao 已提交
350 351 352
    tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
              pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey,
              pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
353
  }
354
}
C
Cary Xu 已提交
355 356 357
#if 0
int           nQUERY = 0;
#endif
C
Cary Xu 已提交
358 359
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
  if (vnodeIsRollup(pVnode)) {
C
Cary Xu 已提交
360
    int level = 0;
C
Cary Xu 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
#if 1
    int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
      SRetention* pRetention = retentions + i;
      if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) {
        break;
      }
    }
#endif
#if 0
    ++nQUERY;
    if(nQUERY%3 == 0) {
      level = 2;
    } else if(nQUERY%2 == 0) {
      level = 1;
    } else {
      level = 0;
    }
#endif
    if (level == TSDB_RETENTION_L0) {
      return VND_RSMA0(pVnode);
    } else if (level == TSDB_RETENTION_L1) {
      return VND_RSMA1(pVnode);
    } else {
      return VND_RSMA2(pVnode);
    }
C
Cary Xu 已提交
387 388 389 390
  }
  return pVnode->pTsdb;
}

391
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
wafwerar's avatar
wafwerar 已提交
392
  STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
393
  if (pReadHandle == NULL) {
394
    goto _end;
395
  }
H
Haojun Liao 已提交
396

C
Cary Xu 已提交
397 398
  STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);

dengyihao's avatar
dengyihao 已提交
399
  pReadHandle->order = pCond->order;
C
Cary Xu 已提交
400
  pReadHandle->pTsdb = pTsdb;
dengyihao's avatar
dengyihao 已提交
401 402 403 404 405 406
  pReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pReadHandle->cur.fid = INT32_MIN;
  pReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pReadHandle->checkFiles = true;
  pReadHandle->activeIndex = 0;  // current active table index
  pReadHandle->allocSize = 0;
407
  pReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
408
  pReadHandle->loadType = pCond->type;
409

dengyihao's avatar
dengyihao 已提交
410
  pReadHandle->outputCapacity = 4096;  //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
411 412 413
  pReadHandle->loadExternalRow = pCond->loadExternalRows;
  pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;

H
Haojun Liao 已提交
414
  char buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
415
  snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
H
Haojun Liao 已提交
416 417
  pReadHandle->idStr = strdup(buf);

C
Cary Xu 已提交
418
  if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
419
    goto _end;
B
Bomin Zhang 已提交
420
  }
H
Haojun Liao 已提交
421

422 423
  assert(pCond != NULL);
  setQueryTimewindow(pReadHandle, pCond);
424

425 426
  if (pCond->numOfCols > 0) {
    // allocate buffer in order to load data blocks from file
427 428
    pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
    if (pReadHandle->suppInfo.pstatis == NULL) {
429
      goto _end;
430
    }
H
Haojun Liao 已提交
431

432
    // todo: use list instead of array?
433 434
    pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
    if (pReadHandle->pColumns == NULL) {
435
      goto _end;
436
    }
H
Haojun Liao 已提交
437

438 439 440
    for (int32_t i = 0; i < pCond->numOfCols; ++i) {
      SColumnInfoData colInfo = {{0}, 0};
      colInfo.info = pCond->colList[i];
H
Haojun Liao 已提交
441

442
      int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
443
      if (code != TSDB_CODE_SUCCESS) {
444
        goto _end;
445
      }
446

447
      taosArrayPush(pReadHandle->pColumns, &colInfo);
B
Bomin Zhang 已提交
448
    }
H
Haojun Liao 已提交
449

450
    pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
C
Cary Xu 已提交
451 452 453 454
    pReadHandle->suppInfo.slotIds =
        taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
    pReadHandle->suppInfo.plist =
        taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
H
Haojun Liao 已提交
455
  }
456

C
Cary Xu 已提交
457
  pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
458
  if (pReadHandle->pDataCols == NULL) {
H
Haojun Liao 已提交
459
    tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
H
Haojun Liao 已提交
460
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
461
    goto _end;
H
hjxilinx 已提交
462
  }
463

464 465
  tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
466

H
Haojun Liao 已提交
467
  return (tsdbReaderT)pReadHandle;
468

dengyihao's avatar
dengyihao 已提交
469
_end:
470
  tsdbCleanupReadHandle(pReadHandle);
471
  terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
472
  return NULL;
H
hjxilinx 已提交
473 474
}

475
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
476
                             uint64_t taskId) {
477
  STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
478
  if (pTsdbReadHandle == NULL) {
479 480 481
    return NULL;
  }

482
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
483
    return (tsdbReaderT*)pTsdbReadHandle;
484
  }
H
Haojun Liao 已提交
485 486

  // todo apply the lastkey of table check to avoid to load header file
487
  pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
488
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
489
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
490 491 492 493
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

C
Cary Xu 已提交
494
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
495

496
  pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
C
Cary Xu 已提交
497
  int32_t  numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
498 499 500 501 502
  int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;

  STSchema* pSchema = pTsdbReadHandle->pSchema;

  int32_t i = 0, j = 0;
C
Cary Xu 已提交
503
  while (i < numOfCols && j < pSchema->numOfCols) {
504 505 506 507 508 509 510 511 512 513 514 515
    if (ids[i] == pSchema->columns[j].colId) {
      pTsdbReadHandle->suppInfo.slotIds[i] = j;
      i++;
      j++;
    } else if (ids[i] > pSchema->columns[j].colId) {
      j++;
    } else {
      //    tsdbCleanupReadHandle(pTsdbReadHandle);
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
    }
  }
516

dengyihao's avatar
dengyihao 已提交
517 518 519
  tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
            taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
            pTsdbReadHandle->idStr);
520

dengyihao's avatar
dengyihao 已提交
521
  return (tsdbReaderT)pTsdbReadHandle;
H
Haojun Liao 已提交
522 523
}

524
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
525
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
526

527 528 529
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
    if (pCond->order != pTsdbReadHandle->order) {
      pTsdbReadHandle->order = pCond->order;
wafwerar's avatar
wafwerar 已提交
530
      TSWAP(pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
531 532 533 534 535
    }

    return;
  }

dengyihao's avatar
dengyihao 已提交
536 537 538 539 540 541 542
  pTsdbReadHandle->order = pCond->order;
  pTsdbReadHandle->window = pCond->twindow;
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
543 544
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
545 546

  if (ASCENDING_TRAVERSE(pCond->order)) {
547
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
548
  } else {
549
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
550 551 552
  }

  // allocate buffer in order to load data blocks from file
553 554
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
555

556 557
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
558

559
  resetCheckInfo(pTsdbReadHandle);
H
Haojun Liao 已提交
560 561
}

562
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) {
563
  STsdbReadHandle* pTsdbReadHandle = queryHandle;
H
Haojun Liao 已提交
564

dengyihao's avatar
dengyihao 已提交
565 566 567 568 569 570 571
  pTsdbReadHandle->order = pCond->order;
  pTsdbReadHandle->window = pCond->twindow;
  pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
  pTsdbReadHandle->cur.fid = -1;
  pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
  pTsdbReadHandle->checkFiles = true;
  pTsdbReadHandle->activeIndex = 0;  // current active table index
572 573
  pTsdbReadHandle->locateStart = false;
  pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
H
Haojun Liao 已提交
574 575

  if (ASCENDING_TRAVERSE(pCond->order)) {
576
    assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
577
  } else {
578
    assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
579 580 581
  }

  // allocate buffer in order to load data blocks from file
582 583
  memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
  memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
H
Haojun Liao 已提交
584

585 586
  tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
  tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
H
Haojun Liao 已提交
587

H
Haojun Liao 已提交
588
  SArray* pTable = NULL;
dengyihao's avatar
dengyihao 已提交
589
  //  STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
590

dengyihao's avatar
dengyihao 已提交
591
  //  pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
592

dengyihao's avatar
dengyihao 已提交
593 594
  pTsdbReadHandle->pTableCheckInfo = NULL;  // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
                                            // &pTable);
595
  if (pTsdbReadHandle->pTableCheckInfo == NULL) {
dengyihao's avatar
dengyihao 已提交
596
    //    tsdbCleanupReadHandle(pTsdbReadHandle);
H
Haojun Liao 已提交
597 598
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
599

dengyihao's avatar
dengyihao 已提交
600 601
  //  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  //  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
H
Haojun Liao 已提交
602 603
}

604
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
dengyihao's avatar
dengyihao 已提交
605
                             uint64_t taskId) {
606
  pCond->twindow = updateLastrowForEachGroup(groupList);
H
Haojun Liao 已提交
607 608 609 610 611 612

  // no qualified table
  if (groupList->numOfTables == 0) {
    return NULL;
  }

613
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, groupList, qId, taskId);
614
  if (pTsdbReadHandle == NULL) {
615 616 617
    return NULL;
  }

618
  int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList);
dengyihao's avatar
dengyihao 已提交
619
  if (code != TSDB_CODE_SUCCESS) {  // set the numOfTables to be 0
H
Haojun Liao 已提交
620 621 622
    terrno = code;
    return NULL;
  }
H
Haojun Liao 已提交
623 624

  assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
625 626
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
init  
dapan1121 已提交
627
  }
dengyihao's avatar
dengyihao 已提交
628

629
  return pTsdbReadHandle;
D
init  
dapan1121 已提交
630 631
}

632
#if 0
633 634
tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
  STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef);
635
  if (pTsdbReadHandle == NULL) {
636 637 638
    return NULL;
  }

639
  int32_t code = checkForCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
640 641 642 643 644
  if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
    terrno = code;
    return NULL;
  }

645 646
  if (pTsdbReadHandle->cachelastrow) {
    pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
D
fix bug  
dapan1121 已提交
647
  }
D
init  
dapan1121 已提交
648
  
649
  return pTsdbReadHandle;
H
hjxilinx 已提交
650 651
}

652
#endif
dengyihao's avatar
dengyihao 已提交
653
SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
654
  assert(pHandle != NULL);
H
Haojun Liao 已提交
655

dengyihao's avatar
dengyihao 已提交
656
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
657

dengyihao's avatar
dengyihao 已提交
658
  size_t  size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
659
  SArray* res = taosArrayInit(size, POINTER_BYTES);
660 661 662
  return res;
}

H
Haojun Liao 已提交
663 664 665 666 667
// leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
  assert(pGroupList);
  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

wafwerar's avatar
wafwerar 已提交
668
  STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
Y
yihaoDeng 已提交
669
  pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
H
Haojun Liao 已提交
670

dengyihao's avatar
dengyihao 已提交
671
  for (int32_t i = 0; i < numOfGroup; ++i) {
H
Haojun Liao 已提交
672
    SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
dengyihao's avatar
dengyihao 已提交
673
    size_t  numOfTables = taosArrayGetSize(oneGroup);
H
Haojun Liao 已提交
674 675 676 677

    SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
    for (int32_t j = 0; j < numOfTables; ++j) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
dengyihao's avatar
dengyihao 已提交
678 679 680 681 682
      //      if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
      //        taosArrayPush(px, pInfo);
      //        pNew->numOfTables += 1;
      //        break;
      //      }
H
Haojun Liao 已提交
683 684 685 686 687 688 689 690 691 692 693 694 695
    }

    // there are no data in this group
    if (taosArrayGetSize(px) == 0) {
      taosArrayDestroy(px);
    } else {
      taosArrayPush(pNew->pGroupList, &px);
    }
  }

  return pNew;
}

696
tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
697
                                          uint64_t qId, uint64_t taskId) {
H
Haojun Liao 已提交
698 699
  STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);

700 701 702 703 704 705 706 707 708 709 710 711
  if (pNew->numOfTables == 0) {
    tsdbDebug("update query time range to invalidate time window");

    assert(taosArrayGetSize(pNew->pGroupList) == 0);
    bool asc = ASCENDING_TRAVERSE(pCond->order);
    if (asc) {
      pCond->twindow.ekey = pCond->twindow.skey - 1;
    } else {
      pCond->twindow.skey = pCond->twindow.ekey - 1;
    }
  }

712
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
713 714
  pTsdbReadHandle->loadExternalRow = true;
  pTsdbReadHandle->currentLoadExternalRows = true;
715

716
  return pTsdbReadHandle;
717 718
}

719
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
720
  if (pCheckInfo->initBuf) {
721 722
    return true;
  }
H
Haojun Liao 已提交
723

724
  pCheckInfo->initBuf = true;
725
  int32_t order = pHandle->order;
H
Haojun Liao 已提交
726

727 728 729
  STbData** pMem = NULL;
  STbData** pIMem = NULL;

730
  TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
731 732 733
  if (pHandle->pTsdb->mem != NULL) {
    pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pMem != NULL) {
H
Haojun Liao 已提交
734
      pCheckInfo->iter =
735
          tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
736
    }
737
  }
H
Haojun Liao 已提交
738

739 740 741
  if (pHandle->pTsdb->imem != NULL) {
    pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId));
    if (pIMem != NULL) {
H
Haojun Liao 已提交
742
      pCheckInfo->iiter =
743
          tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
H
Haojun Liao 已提交
744
    }
745
  }
H
Haojun Liao 已提交
746

747 748 749 750
  // both iterators are NULL, no data in buffer right now
  if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
    return false;
  }
H
Haojun Liao 已提交
751

dengyihao's avatar
dengyihao 已提交
752
  bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter));
753
  bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter));
dengyihao's avatar
dengyihao 已提交
754
  if (memEmpty && imemEmpty) {  // buffer is empty
755 756
    return false;
  }
H
Haojun Liao 已提交
757

758 759 760
  if (!memEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    assert(node != NULL);
H
Haojun Liao 已提交
761

H
Haojun Liao 已提交
762 763
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
764
    tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
765 766 767
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey,
              (*pMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
768 769 770 771 772 773 774

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }

775
  } else {
dengyihao's avatar
dengyihao 已提交
776
    tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
777
  }
H
Haojun Liao 已提交
778

779 780 781
  if (!imemEmpty) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    assert(node != NULL);
H
Haojun Liao 已提交
782

H
Haojun Liao 已提交
783 784
    STSRow* row = (STSRow*)SL_GET_NODE_DATA(node);
    TSKEY   key = TD_ROW_KEY(row);  // first timestamp in buffer
785
    tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
dengyihao's avatar
dengyihao 已提交
786 787 788
              "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
              pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey,
              (*pIMem)->nrows, pHandle->idStr);
H
Haojun Liao 已提交
789 790 791 792 793 794

    if (ASCENDING_TRAVERSE(order)) {
      assert(pCheckInfo->lastKey <= key);
    } else {
      assert(pCheckInfo->lastKey >= key);
    }
795
  } else {
dengyihao's avatar
dengyihao 已提交
796
    tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
797
  }
H
Haojun Liao 已提交
798

799 800 801
  return true;
}

H
Haojun Liao 已提交
802 803 804 805 806
static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
  tSkipListDestroyIter(pCheckInfo->iter);
  tSkipListDestroyIter(pCheckInfo->iiter);
}

807
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
H
Haojun Liao 已提交
808
  STSRow *rmem = NULL, *rimem = NULL;
809 810 811
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
812
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
813 814 815 816 817 818
    }
  }

  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
819
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
820 821 822 823 824 825 826 827 828
    }
  }

  if (rmem == NULL && rimem == NULL) {
    return TSKEY_INITIAL_VAL;
  }

  if (rmem != NULL && rimem == NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
829
    return TD_ROW_KEY(rmem);
830 831 832 833
  }

  if (rmem == NULL && rimem != NULL) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
834
    return TD_ROW_KEY(rimem);
835 836
  }

H
Haojun Liao 已提交
837 838
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
839 840

  if (r1 == r2) {
dengyihao's avatar
dengyihao 已提交
841
    if (update == TD_ROW_DISCARD_UPDATE) {
842 843
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
      tSkipListIterNext(pCheckInfo->iter);
dengyihao's avatar
dengyihao 已提交
844
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
845 846 847 848 849 850 851 852 853
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      tSkipListIterNext(pCheckInfo->iiter);
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
    }
    return r1;
  } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
    pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
    return r1;
dengyihao's avatar
dengyihao 已提交
854
  } else {
855 856 857 858 859
    pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
    return r2;
  }
}

H
Haojun Liao 已提交
860 861
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) {
  STSRow *rmem = NULL, *rimem = NULL;
H
Haojun Liao 已提交
862 863 864
  if (pCheckInfo->iter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
    if (node != NULL) {
H
Haojun Liao 已提交
865
      rmem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
866 867
    }
  }
868

H
Haojun Liao 已提交
869 870 871
  if (pCheckInfo->iiter) {
    SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
    if (node != NULL) {
H
Haojun Liao 已提交
872
      rimem = (STSRow*)SL_GET_NODE_DATA(node);
H
Haojun Liao 已提交
873 874
    }
  }
875

H
Haojun Liao 已提交
876 877
  if (rmem == NULL && rimem == NULL) {
    return NULL;
H
Haojun Liao 已提交
878
  }
879

H
Haojun Liao 已提交
880
  if (rmem != NULL && rimem == NULL) {
H
Haojun Liao 已提交
881 882 883
    pCheckInfo->chosen = 0;
    return rmem;
  }
884

H
Haojun Liao 已提交
885
  if (rmem == NULL && rimem != NULL) {
H
Haojun Liao 已提交
886 887 888
    pCheckInfo->chosen = 1;
    return rimem;
  }
889

H
Haojun Liao 已提交
890 891
  TSKEY r1 = TD_ROW_KEY(rmem);
  TSKEY r2 = TD_ROW_KEY(rimem);
H
Haojun Liao 已提交
892

893 894
  if (r1 == r2) {
    if (update == TD_ROW_DISCARD_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
895
      tSkipListIterNext(pCheckInfo->iter);
896
      pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
TD-1439  
Hongze Cheng 已提交
897
      return rimem;
dengyihao's avatar
dengyihao 已提交
898
    } else if (update == TD_ROW_OVERWRITE_UPDATE) {
H
TD-1439  
Hongze Cheng 已提交
899
      tSkipListIterNext(pCheckInfo->iiter);
900 901 902 903
      pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
      return rmem;
    } else {
      pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
H
Haojun Liao 已提交
904
      *extraRow = rimem;
H
TD-1439  
Hongze Cheng 已提交
905 906
      return rmem;
    }
H
Haojun Liao 已提交
907 908 909
  } else {
    if (ASCENDING_TRAVERSE(order)) {
      if (r1 < r2) {
910
        pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
H
Haojun Liao 已提交
911 912
        return rmem;
      } else {
913
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
914 915 916 917
        return rimem;
      }
    } else {
      if (r1 < r2) {
918
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
919 920
        return rimem;
      } else {
921
        pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
H
Haojun Liao 已提交
922 923 924 925
        return rmem;
      }
    }
  }
H
Haojun Liao 已提交
926 927
}

928
static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
H
Haojun Liao 已提交
929
  bool hasNext = false;
930
  if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) {
H
Haojun Liao 已提交
931 932 933
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
934

H
Haojun Liao 已提交
935 936 937
    if (hasNext) {
      return hasNext;
    }
938

H
Haojun Liao 已提交
939 940 941
    if (pCheckInfo->iiter != NULL) {
      return tSkipListIterGet(pCheckInfo->iiter) != NULL;
    }
dengyihao's avatar
dengyihao 已提交
942
  } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) {
943 944 945
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter);
    }
946

947 948 949
    if (hasNext) {
      return hasNext;
    }
950

951 952
    if (pCheckInfo->iter != NULL) {
      return tSkipListIterGet(pCheckInfo->iter) != NULL;
H
Haojun Liao 已提交
953
    }
954 955 956 957 958 959 960
  } else {
    if (pCheckInfo->iter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iter);
    }
    if (pCheckInfo->iiter != NULL) {
      hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext;
    }
H
Haojun Liao 已提交
961
  }
962

H
Haojun Liao 已提交
963 964 965
  return hasNext;
}

966
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
H
Hongze Cheng 已提交
967
  STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
dengyihao's avatar
dengyihao 已提交
968
  size_t    size = taosArrayGetSize(pHandle->pTableCheckInfo);
969
  assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
D
dapan1121 已提交
970
  pHandle->cur.fid = INT32_MIN;
H
Haojun Liao 已提交
971

972
  STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
H
Haojun Liao 已提交
973 974 975 976
  if (!pCheckInfo->initBuf) {
    initTableMemIterator(pHandle, pCheckInfo);
  }

H
Haojun Liao 已提交
977
  STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
978
  if (row == NULL) {
979 980
    return false;
  }
981

H
Haojun Liao 已提交
982
  pCheckInfo->lastKey = TD_ROW_KEY(row);  // first timestamp in buffer
dengyihao's avatar
dengyihao 已提交
983 984
  tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
            pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
H
Haojun Liao 已提交
985

986
  // all data in mem are checked already.
987 988
  if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
      (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
989 990
    return false;
  }
H
Haojun Liao 已提交
991

dengyihao's avatar
dengyihao 已提交
992
  int32_t      step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
993
  STimeWindow* win = &pHandle->cur.win;
H
Haojun Liao 已提交
994
  pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
H
Haojun Liao 已提交
995

996 997 998 999
  // update the last key value
  pCheckInfo->lastKey = win->ekey + step;
  pHandle->cur.lastKey = win->ekey + step;
  pHandle->cur.mixBlock = true;
1000

1001
  if (!ASCENDING_TRAVERSE(pHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1002
    TSWAP(win->skey, win->ekey);
1003
  }
H
Haojun Liao 已提交
1004

1005
  return true;
1006
}
H
hjxilinx 已提交
1007

1008 1009
static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
  assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
1010 1011 1012
  if (key == TSKEY_INITIAL_VAL) {
    return INT32_MIN;
  }
H
Haojun Liao 已提交
1013

D
dapan1121 已提交
1014
  if (key < 0) {
1015
    key -= (daysPerFile * tsTickPerDay[precision]);
D
dapan1121 已提交
1016
  }
dengyihao's avatar
dengyihao 已提交
1017

1018
  int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision]));  // set the starting fileId
dengyihao's avatar
dengyihao 已提交
1019
  if (fid < 0L && llabs(fid) > INT32_MAX) {                                // data value overflow for INT32
1020 1021
    fid = INT32_MIN;
  }
H
Haojun Liao 已提交
1022

1023
  if (fid > 0L && fid > INT32_MAX) {
1024 1025
    fid = INT32_MAX;
  }
H
Haojun Liao 已提交
1026

S
TD-1057  
Shengliang Guan 已提交
1027
  return (int32_t)fid;
1028 1029
}

H
refact  
Hongze Cheng 已提交
1030
static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
1031 1032
  int32_t firstSlot = 0;
  int32_t lastSlot = numOfBlocks - 1;
H
Haojun Liao 已提交
1033

1034
  int32_t midSlot = firstSlot;
H
Haojun Liao 已提交
1035

1036 1037 1038
  while (1) {
    numOfBlocks = lastSlot - firstSlot + 1;
    midSlot = (firstSlot + (numOfBlocks >> 1));
H
Haojun Liao 已提交
1039

1040
    if (numOfBlocks == 1) break;
H
Haojun Liao 已提交
1041

1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
    if (skey > pBlock[midSlot].keyLast) {
      if (numOfBlocks == 2) break;
      if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
      firstSlot = midSlot + 1;
    } else if (skey < pBlock[midSlot].keyFirst) {
      if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
      lastSlot = midSlot - 1;
    } else {
      break;  // got the slot
    }
  }
H
Haojun Liao 已提交
1053

1054 1055
  return midSlot;
}
1056

dengyihao's avatar
dengyihao 已提交
1057
static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1058
  int32_t code = 0;
H
Haojun Liao 已提交
1059

1060
  STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
H
Haojun Liao 已提交
1061
  pCheckInfo->numOfBlocks = 0;
1062

H
Haojun Liao 已提交
1063
  STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
1064
  table.pSchema = pTsdbReadHandle->pSchema;
H
Haojun Liao 已提交
1065 1066

  if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1067 1068 1069
    code = terrno;
    return code;
  }
1070

1071
  SBlockIdx* compIndex = pTsdbReadHandle->rhelper.pBlkIdx;
H
Hongze Cheng 已提交
1072

H
Haojun Liao 已提交
1073
  // no data block in this file, try next file
1074
  if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId) {
H
Haojun Liao 已提交
1075 1076
    return 0;  // no data blocks in the file belongs to pCheckInfo->pTable
  }
1077

H
Haojun Liao 已提交
1078 1079 1080
  if (pCheckInfo->compSize < (int32_t)compIndex->len) {
    assert(compIndex->len > 0);

wafwerar's avatar
wafwerar 已提交
1081
    char* t = taosMemoryRealloc(pCheckInfo->pCompInfo, compIndex->len);
H
Haojun Liao 已提交
1082 1083 1084 1085
    if (t == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return code;
1086 1087
    }

H
Haojun Liao 已提交
1088 1089 1090
    pCheckInfo->pCompInfo = (SBlockInfo*)t;
    pCheckInfo->compSize = compIndex->len;
  }
1091

1092
  if (tsdbLoadBlockInfo(&(pTsdbReadHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
H
Hongze Cheng 已提交
1093 1094
    return terrno;
  }
H
Haojun Liao 已提交
1095
  SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
1096

H
Haojun Liao 已提交
1097
  TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
1098

1099
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
dengyihao's avatar
dengyihao 已提交
1100 1101
    assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1102
  } else {
dengyihao's avatar
dengyihao 已提交
1103 1104
    assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
           pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1105
  }
1106

dengyihao's avatar
dengyihao 已提交
1107 1108
  s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
  e = TMAX(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
1109

H
Haojun Liao 已提交
1110 1111 1112
  // discard the unqualified data block based on the query time window
  int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
  int32_t end = start;
H
TD-100  
hzcheng 已提交
1113

H
Haojun Liao 已提交
1114 1115 1116
  if (s > pCompInfo->blocks[start].keyLast) {
    return 0;
  }
1117

H
Haojun Liao 已提交
1118 1119 1120 1121
  // todo speedup the procedure of located end block
  while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
    end += 1;
  }
1122

H
Haojun Liao 已提交
1123
  pCheckInfo->numOfBlocks = (end - start);
1124

H
Haojun Liao 已提交
1125 1126 1127
  if (start > 0) {
    memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
  }
1128

H
Haojun Liao 已提交
1129 1130 1131
  (*numOfBlocks) += pCheckInfo->numOfBlocks;
  return 0;
}
1132

1133
static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfBlocks) {
H
Haojun Liao 已提交
1134 1135 1136 1137
  // load all the comp offset value for all tables in this file
  int32_t code = TSDB_CODE_SUCCESS;
  *numOfBlocks = 0;

1138
  pTsdbReadHandle->cost.headFileLoad += 1;
1139 1140
  int64_t s = taosGetTimestampUs();

H
Haojun Liao 已提交
1141
  size_t numOfTables = 0;
1142 1143 1144 1145
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    code = loadBlockInfo(pTsdbReadHandle, pTsdbReadHandle->activeIndex, numOfBlocks);
  } else if (pTsdbReadHandle->loadType == BLOCK_LOAD_OFFSET_SEQ_ORDER) {
    numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
1146

H
Haojun Liao 已提交
1147
    for (int32_t i = 0; i < numOfTables; ++i) {
1148
      code = loadBlockInfo(pTsdbReadHandle, i, numOfBlocks);
H
Haojun Liao 已提交
1149
      if (code != TSDB_CODE_SUCCESS) {
1150 1151
        int64_t e = taosGetTimestampUs();

1152
        pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1153 1154 1155 1156 1157
        return code;
      }
    }
  } else {
    assert(0);
1158
  }
1159

1160
  int64_t e = taosGetTimestampUs();
1161
  pTsdbReadHandle->cost.headFileLoadTime += (e - s);
H
Haojun Liao 已提交
1162
  return code;
1163 1164
}

dengyihao's avatar
dengyihao 已提交
1165 1166
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                   int32_t slotIndex) {
H
Haojun Liao 已提交
1167
  int64_t st = taosGetTimestampUs();
1168

C
Cary Xu 已提交
1169
  int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1170
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1171
    tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1172 1173 1174 1175
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1176
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1177
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1178
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1179 1180 1181 1182
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }

1183
  code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
H
Haojun Liao 已提交
1184
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1185
    tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1186 1187 1188
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    goto _error;
  }
1189

1190
  int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
1191

dengyihao's avatar
dengyihao 已提交
1192 1193
  int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
                                      (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
H
Haojun Liao 已提交
1194
  if (ret != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1195 1196 1197
    int32_t c = terrno;
    assert(c != TSDB_CODE_SUCCESS);
    goto _error;
H
Haojun Liao 已提交
1198
  }
1199

1200
  SDataBlockLoadInfo* pBlockLoadInfo = &pTsdbReadHandle->dataBlockLoadInfo;
1201

1202 1203
  pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
  pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
H
Haojun Liao 已提交
1204
  pBlockLoadInfo->uid = pCheckInfo->tableId;
1205

1206
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
1207
  assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
1208

1209
  pBlock->numOfRows = pCols->numOfRows;
H
Haojun Liao 已提交
1210

1211
  // Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
dengyihao's avatar
dengyihao 已提交
1212
  if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
1213
    int64_t* src = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1214
    for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
1215 1216 1217 1218
      src[i] = tdGetKey(src[i]);
    }
  }

H
Haojun Liao 已提交
1219
  int64_t elapsedTime = (taosGetTimestampUs() - st);
1220
  pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
1221

dengyihao's avatar
dengyihao 已提交
1222 1223 1224 1225
  tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
            " us, %s",
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1226
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1227 1228 1229 1230

_error:
  pBlock->numOfRows = 0;

dengyihao's avatar
dengyihao 已提交
1231
  tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
H
Haojun Liao 已提交
1232
            pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1233
  return terrno;
H
hjxilinx 已提交
1234 1235
}

1236
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
dengyihao's avatar
dengyihao 已提交
1237 1238 1239 1240 1241 1242 1243 1244
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end);
static void    moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void    doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void    copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                              SDataBlockInfo* pBlockInfo, int32_t endPos);

static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
1245
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Hongze Cheng 已提交
1246
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1247
  SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
1248
  TSKEY          key;
dengyihao's avatar
dengyihao 已提交
1249
  int32_t        code = TSDB_CODE_SUCCESS;
1250

1251
  /*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
H
Haojun Liao 已提交
1252 1253
  assert(cur->pos >= 0 && cur->pos <= binfo.rows);

1254
  key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
1255

H
Haojun Liao 已提交
1256
  if (key != TSKEY_INITIAL_VAL) {
dengyihao's avatar
dengyihao 已提交
1257
    tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1258
  } else {
H
Haojun Liao 已提交
1259
    tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1260
  }
H
Haojun Liao 已提交
1261

1262 1263
  bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

1264 1265 1266 1267
  if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
      (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
    if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
        (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
H
Haojun Liao 已提交
1268
      // do not load file block into buffer
1269
      int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
1270

dengyihao's avatar
dengyihao 已提交
1271 1272 1273 1274
      TSKEY maxKey =
          ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
      cur->rows =
          tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
1275
      pTsdbReadHandle->realNumOfRows = cur->rows;
H
Haojun Liao 已提交
1276 1277 1278

      // update the last key value
      pCheckInfo->lastKey = cur->win.ekey + step;
1279
      if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1280
        TSWAP(cur->win.skey, cur->win.ekey);
H
Haojun Liao 已提交
1281
      }
H
Haojun Liao 已提交
1282

H
Haojun Liao 已提交
1283 1284
      cur->mixBlock = true;
      cur->blockCompleted = false;
H
Haojun Liao 已提交
1285
      return code;
H
Haojun Liao 已提交
1286
    }
H
Haojun Liao 已提交
1287

1288
    // return error, add test cases
1289
    if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1290
      return code;
1291 1292
    }

1293
    doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1294
  } else {
1295 1296 1297 1298 1299 1300
    /*
     * no data in cache, only load data from file
     * during the query processing, data in cache will not be checked anymore.
     *
     * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
     */
1301 1302
    assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
    int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
1303

1304 1305
    if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) ||
        (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
1306
      pTsdbReadHandle->realNumOfRows = binfo.rows;
1307 1308

      cur->rows = binfo.rows;
dengyihao's avatar
dengyihao 已提交
1309
      cur->win = binfo.window;
1310
      cur->mixBlock = false;
H
Haojun Liao 已提交
1311 1312
      cur->blockCompleted = true;

1313
      if (ascScan) {
H
Haojun Liao 已提交
1314 1315 1316 1317 1318 1319
        cur->lastKey = binfo.window.ekey + 1;
        cur->pos = binfo.rows;
      } else {
        cur->lastKey = binfo.window.skey - 1;
        cur->pos = -1;
      }
dengyihao's avatar
dengyihao 已提交
1320
    } else {  // partially copy to dest buffer
1321
      copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
1322 1323
      cur->mixBlock = true;
    }
1324

H
Haojun Liao 已提交
1325
    assert(cur->blockCompleted);
H
Haojun Liao 已提交
1326
    if (cur->rows == binfo.rows) {
dengyihao's avatar
dengyihao 已提交
1327
      tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
H
Haojun Liao 已提交
1328
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1329
    } else {
dengyihao's avatar
dengyihao 已提交
1330 1331 1332 1333
      tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
                ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
                pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
                pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1334
    }
1335
  }
H
Haojun Liao 已提交
1336 1337

  return code;
1338 1339
}

dengyihao's avatar
dengyihao 已提交
1340 1341
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
                                 bool* exists) {
1342
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1343 1344
  int32_t        code = TSDB_CODE_SUCCESS;
  bool           asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
1345

1346
  if (asc) {
H
Haojun Liao 已提交
1347
    // query ended in/started from current block
1348 1349
    if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1350 1351
        *exists = false;
        return code;
1352
      }
1353

1354
      SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1355
      assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
H
Haojun Liao 已提交
1356

1357 1358
      if (pCheckInfo->lastKey > pBlock->keyFirst) {
        cur->pos =
1359
            binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1360 1361 1362
      } else {
        cur->pos = 0;
      }
H
Haojun Liao 已提交
1363

H
Haojun Liao 已提交
1364
      assert(pCheckInfo->lastKey <= pBlock->keyLast);
1365
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1366
    } else {  // the whole block is loaded in to buffer
dengyihao's avatar
dengyihao 已提交
1367
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1368
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1369
    }
dengyihao's avatar
dengyihao 已提交
1370
  } else {  // desc order, query ended in current block
1371 1372
    if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
      if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1373 1374
        *exists = false;
        return code;
1375
      }
H
Haojun Liao 已提交
1376

1377
      SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
1378
      if (pCheckInfo->lastKey < pBlock->keyLast) {
dengyihao's avatar
dengyihao 已提交
1379 1380
        cur->pos =
            binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
1381
      } else {
H
Haojun Liao 已提交
1382
        cur->pos = pBlock->numOfRows - 1;
1383
      }
H
Haojun Liao 已提交
1384

H
Haojun Liao 已提交
1385
      assert(pCheckInfo->lastKey >= pBlock->keyFirst);
1386
      doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
1387
    } else {
dengyihao's avatar
dengyihao 已提交
1388
      cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
1389
      code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
H
[td-32]  
hjxilinx 已提交
1390
    }
1391
  }
1392

1393
  *exists = pTsdbReadHandle->realNumOfRows > 0;
H
Haojun Liao 已提交
1394
  return code;
H
[td-32]  
hjxilinx 已提交
1395 1396
}

1397
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
1398
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
1399
  int    numOfRows;
1400 1401
  TSKEY* keyList;

1402
  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
H
Haojun Liao 已提交
1403

1404
  if (num <= 0) return -1;
1405 1406

  keyList = (TSKEY*)pValue;
1407 1408
  firstPos = 0;
  lastPos = num - 1;
1409

1410
  if (order == TSDB_ORDER_DESC) {
1411 1412 1413 1414 1415
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
1416

H
Haojun Liao 已提交
1417 1418
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1419

1420 1421 1422 1423 1424 1425 1426 1427
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
1428

1429 1430 1431 1432 1433
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
1434

1435 1436 1437 1438 1439 1440 1441
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
1442

H
Haojun Liao 已提交
1443 1444
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
1445

1446 1447 1448 1449 1450 1451 1452 1453 1454
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
1455

1456 1457 1458
  return midPos;
}

dengyihao's avatar
dengyihao 已提交
1459 1460
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
                                       int32_t start, int32_t end) {
1461
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1462
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1463

1464
  int32_t num = end - start + 1;
H
Haojun Liao 已提交
1465 1466 1467 1468 1469 1470
  assert(num >= 0);

  if (num == 0) {
    return numOfRows;
  }

1471 1472
  bool    ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
  int32_t trueStart = ascScan ? start : end;
1473
  int32_t trueEnd = ascScan ? end : start;
1474 1475
  int32_t step = ascScan ? 1 : -1;

1476
  int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Haojun Liao 已提交
1477

dengyihao's avatar
dengyihao 已提交
1478
  // data in buffer has greater timestamp, copy data in file block
1479
  int32_t i = 0, j = 0;
dengyihao's avatar
dengyihao 已提交
1480
  while (i < requiredNumOfCols && j < pCols->numOfCols) {
1481
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1482 1483 1484 1485 1486 1487 1488

    SDataCol* src = &pCols->cols[j];
    if (src->colId < pColInfo->info.colId) {
      j++;
      continue;
    }

L
Liu Jicong 已提交
1489
    if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
1490
      if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) {  // todo opt performance
dengyihao's avatar
dengyihao 已提交
1491
        //        memmove(pData, (char*)src->pData + bytes * start, bytes * num);
1492
        int32_t rowIndex = numOfRows;
1493
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1494
          SCellVal sVal = {0};
C
Cary Xu 已提交
1495
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
1496 1497
            TASSERT(0);
          }
1498

1499
          if (sVal.valType == TD_VTYPE_NULL) {
1500
            colDataAppendNULL(pColInfo, rowIndex);
1501
          } else {
1502
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
1503 1504 1505
          }
        }
      } else {  // handle the var-string
1506 1507
        int32_t rowIndex = numOfRows;

1508
        // todo refactor, only copy one-by-one
1509
        for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
1510
          SCellVal sVal = {0};
dengyihao's avatar
dengyihao 已提交
1511
          if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
H
Haojun Liao 已提交
1512 1513
            TASSERT(0);
          }
1514

1515
          if (sVal.valType == TD_VTYPE_NULL) {
1516
            colDataAppendNULL(pColInfo, rowIndex);
1517
          } else {
1518
            colDataAppend(pColInfo, rowIndex, sVal.val, false);
1519
          }
1520 1521
        }
      }
1522 1523 1524

      j++;
      i++;
H
Hongze Cheng 已提交
1525
    } else {  // pColInfo->info.colId < src->colId, it is a NULL data
1526
      colDataAppendNNULL(pColInfo, numOfRows, num);
1527
      i++;
1528 1529
    }
  }
1530

dengyihao's avatar
dengyihao 已提交
1531
  while (i < requiredNumOfCols) {  // the remain columns are all null data
1532
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1533
    colDataAppendNNULL(pColInfo, numOfRows, num);
1534
    i++;
1535
  }
H
Haojun Liao 已提交
1536

1537 1538
  pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
  pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
1539

1540
  return numOfRows + num;
1541 1542
}

H
Haojun Liao 已提交
1543
// TODO fix bug for reverse copy data problem
1544
// Note: row1 always has high priority
H
Haojun Liao 已提交
1545 1546 1547 1548
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
                               STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
                               bool forceSetNull) {
#if 1
dengyihao's avatar
dengyihao 已提交
1549 1550 1551 1552 1553 1554 1555 1556 1557
  STSchema* pSchema;
  STSRow*   row;
  int16_t   colId;
  int16_t   offset;

  bool     isRow1DataRow = TD_IS_TP_ROW(row1);
  bool     isRow2DataRow;
  bool     isChosenRowDataRow;
  int32_t  chosen_itr;
H
Haojun Liao 已提交
1558
  SCellVal sVal = {0};
1559

H
Haojun Liao 已提交
1560
  // the schema version info is embeded in STSRow
1561 1562 1563
  int32_t numOfColsOfRow1 = 0;

  if (pSchema1 == NULL) {
H
Hongze Cheng 已提交
1564
    pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
1565
  }
1566

C
Cary Xu 已提交
1567 1568 1569 1570
#ifdef TD_DEBUG_PRINT_ROW
  tdSRowPrint(row1, pSchema1, __func__);
#endif

dengyihao's avatar
dengyihao 已提交
1571
  if (isRow1DataRow) {
1572
    numOfColsOfRow1 = schemaNCols(pSchema1);
H
Haojun Liao 已提交
1573
  } else {
H
Haojun Liao 已提交
1574
    numOfColsOfRow1 = tdRowGetNCols(row1);
D
fix bug  
dapan1121 已提交
1575
  }
1576

1577
  int32_t numOfColsOfRow2 = 0;
dengyihao's avatar
dengyihao 已提交
1578
  if (row2) {
H
Haojun Liao 已提交
1579
    isRow2DataRow = TD_IS_TP_ROW(row2);
1580
    if (pSchema2 == NULL) {
H
Hongze Cheng 已提交
1581
      pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
1582
    }
dengyihao's avatar
dengyihao 已提交
1583
    if (isRow2DataRow) {
1584 1585
      numOfColsOfRow2 = schemaNCols(pSchema2);
    } else {
H
Haojun Liao 已提交
1586
      numOfColsOfRow2 = tdRowGetNCols(row2);
1587 1588
    }
  }
C
Cary Xu 已提交
1589

1590
  int32_t i = 0, j = 0, k = 0;
dengyihao's avatar
dengyihao 已提交
1591
  while (i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) {
1592
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
1593 1594

    int32_t colIdOfRow1;
dengyihao's avatar
dengyihao 已提交
1595
    if (j >= numOfColsOfRow1) {
1596
      colIdOfRow1 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1597
    } else if (isRow1DataRow) {
1598 1599
      colIdOfRow1 = pSchema1->columns[j].colId;
    } else {
C
Cary Xu 已提交
1600
      colIdOfRow1 = tdKvRowColIdAt(row1, j);
1601 1602 1603
    }

    int32_t colIdOfRow2;
dengyihao's avatar
dengyihao 已提交
1604
    if (k >= numOfColsOfRow2) {
1605
      colIdOfRow2 = INT32_MAX;
dengyihao's avatar
dengyihao 已提交
1606
    } else if (isRow2DataRow) {
1607 1608
      colIdOfRow2 = pSchema2->columns[k].colId;
    } else {
C
Cary Xu 已提交
1609
      colIdOfRow2 = tdKvRowColIdAt(row2, k);
1610 1611
    }

dengyihao's avatar
dengyihao 已提交
1612 1613
    if (colIdOfRow1 == colIdOfRow2) {
      if (colIdOfRow1 < pColInfo->info.colId) {
C
Cary Xu 已提交
1614
        j++;
1615
        k++;
C
Cary Xu 已提交
1616 1617
        continue;
      }
1618 1619 1620 1621
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
dengyihao's avatar
dengyihao 已提交
1622 1623
    } else if (colIdOfRow1 < colIdOfRow2) {
      if (colIdOfRow1 < pColInfo->info.colId) {
1624 1625
        j++;
        continue;
C
Cary Xu 已提交
1626
      }
1627 1628 1629 1630 1631
      row = row1;
      pSchema = pSchema1;
      isChosenRowDataRow = isRow1DataRow;
      chosen_itr = j;
    } else {
dengyihao's avatar
dengyihao 已提交
1632
      if (colIdOfRow2 < pColInfo->info.colId) {
1633 1634 1635 1636 1637 1638 1639 1640
        k++;
        continue;
      }
      row = row2;
      pSchema = pSchema2;
      chosen_itr = k;
      isChosenRowDataRow = isRow2DataRow;
    }
dengyihao's avatar
dengyihao 已提交
1641
    if (isChosenRowDataRow) {
1642 1643
      colId = pSchema->columns[chosen_itr].colId;
      offset = pSchema->columns[chosen_itr].offset;
C
Cary Xu 已提交
1644 1645
      // TODO: use STSRowIter
      tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
1646
    } else {
C
Cary Xu 已提交
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656
      // TODO: use STSRowIter
      if (chosen_itr == 0) {
        colId = PRIMARYKEY_TIMESTAMP_COL_ID;
        tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
      } else {
        SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
        colId = pColIdx->colId;
        offset = pColIdx->offset;
        tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal);
      }
1657 1658 1659
    }

    if (colId == pColInfo->info.colId) {
H
Haojun Liao 已提交
1660
      if (tdValTypeIsNorm(sVal.valType)) {
H
Haojun Liao 已提交
1661
        colDataAppend(pColInfo, numOfRows, sVal.val, false);
H
Haojun Liao 已提交
1662
      } else if (forceSetNull) {
H
Haojun Liao 已提交
1663
        colDataAppend(pColInfo, numOfRows, NULL, true);
1664
      }
H
Haojun Liao 已提交
1665

1666
      i++;
C
Cary Xu 已提交
1667

dengyihao's avatar
dengyihao 已提交
1668
      if (row == row1) {
C
Cary Xu 已提交
1669
        j++;
1670 1671 1672 1673
      } else {
        k++;
      }
    } else {
dengyihao's avatar
dengyihao 已提交
1674
      if (forceSetNull) {
H
Haojun Liao 已提交
1675
        colDataAppend(pColInfo, numOfRows, NULL, true);
C
Cary Xu 已提交
1676
      }
1677
      i++;
1678
    }
1679
  }
1680

dengyihao's avatar
dengyihao 已提交
1681 1682
  if (forceSetNull) {
    while (i < numOfCols) {  // the remain columns are all null data
1683
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
H
Haojun Liao 已提交
1684
      colDataAppend(pColInfo, numOfRows, NULL, true);
1685
      i++;
1686 1687
    }
  }
H
Haojun Liao 已提交
1688
#endif
1689
}
1690

1691 1692
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
  if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1693 1694 1695 1696
    return;
  }

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1697 1698
  if (numOfRows < pTsdbReadHandle->outputCapacity) {
    int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
dengyihao's avatar
dengyihao 已提交
1699
    for (int32_t i = 0; i < numOfCols; ++i) {
1700
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
dengyihao's avatar
dengyihao 已提交
1701 1702
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
              numOfRows * pColInfo->info.bytes);
1703 1704 1705 1706
    }
  }
}

dengyihao's avatar
dengyihao 已提交
1707 1708
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
                                int32_t numOfExisted, int32_t* start, int32_t* end) {
1709 1710
  *start = -1;

1711
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
1712
    int32_t remain = endPos - startPos + 1;
1713 1714
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
H
Haojun Liao 已提交
1715 1716
    } else {
      *end = endPos;
1717 1718 1719 1720 1721
    }

    *start = startPos;
  } else {
    int32_t remain = (startPos - endPos) + 1;
1722 1723
    if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
      *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
H
Haojun Liao 已提交
1724 1725
    } else {
      *end = endPos;
1726 1727 1728 1729 1730 1731 1732
    }

    *start = *end;
    *end = startPos;
  }
}

dengyihao's avatar
dengyihao 已提交
1733 1734
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
                                 int32_t endPos) {
1735
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1736 1737

  pCheckInfo->lastKey = cur->lastKey;
1738
  pTsdbReadHandle->realNumOfRows = numOfRows;
1739 1740 1741 1742
  cur->rows = numOfRows;
  cur->pos = endPos;
}

1743 1744
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1745 1746

  if (cur->rows > 0) {
1747 1748
    if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
      assert(cur->win.skey >= pTsdbReadHandle->window.skey && cur->win.ekey <= pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
1749
    } else {
1750
      assert(cur->win.skey >= pTsdbReadHandle->window.ekey && cur->win.ekey <= pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
1751 1752
    }

1753
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
dengyihao's avatar
dengyihao 已提交
1754 1755
    assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
           cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
H
Haojun Liao 已提交
1756
  } else {
1757
    cur->win = pTsdbReadHandle->window;
H
Haojun Liao 已提交
1758

dengyihao's avatar
dengyihao 已提交
1759
    int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1760
    cur->lastKey = pTsdbReadHandle->window.ekey + step;
H
Haojun Liao 已提交
1761 1762 1763
  }
}

dengyihao's avatar
dengyihao 已提交
1764 1765
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
                                           SDataBlockInfo* pBlockInfo, int32_t endPos) {
1766
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
1767

1768
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
dengyihao's avatar
dengyihao 已提交
1769
  TSKEY*     tsArray = pCols->cols[0].pData;
H
Haojun Liao 已提交
1770

dengyihao's avatar
dengyihao 已提交
1771
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1772
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
H
Haojun Liao 已提交
1773 1774 1775 1776 1777 1778

  int32_t pos = cur->pos;

  int32_t start = cur->pos;
  int32_t end = endPos;

1779
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
1780
    TSWAP(start, end);
H
Haojun Liao 已提交
1781 1782
  }

1783 1784
  assert(pTsdbReadHandle->outputCapacity >= (end - start + 1));
  int32_t numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, start, end);
H
Haojun Liao 已提交
1785 1786

  // the time window should always be ascending order: skey <= ekey
dengyihao's avatar
dengyihao 已提交
1787
  cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
H
Haojun Liao 已提交
1788
  cur->mixBlock = (numOfRows != pBlockInfo->rows);
H
Haojun Liao 已提交
1789
  cur->lastKey = tsArray[endPos] + step;
H
Haojun Liao 已提交
1790
  cur->blockCompleted = true;
H
Haojun Liao 已提交
1791 1792

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
1793
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
H
Haojun Liao 已提交
1794 1795 1796

  // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
  pos = endPos + step;
1797 1798
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
1799

dengyihao's avatar
dengyihao 已提交
1800 1801 1802
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1803 1804
}

1805
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
H
Haojun Liao 已提交
1806 1807
  // NOTE: reverse the order to find the end position in data block
  int32_t endPos = -1;
dengyihao's avatar
dengyihao 已提交
1808
  int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1809

1810
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
dengyihao's avatar
dengyihao 已提交
1811
  SDataCols*     pCols = pTsdbReadHandle->rhelper.pDCols[0];
H
Haojun Liao 已提交
1812

1813
  if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
H
Haojun Liao 已提交
1814 1815
    endPos = pBlockInfo->rows - 1;
    cur->mixBlock = (cur->pos != 0);
1816
  } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
H
Haojun Liao 已提交
1817 1818 1819 1820
    endPos = 0;
    cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
  } else {
    assert(pCols->numOfRows > 0);
1821
    endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
H
Haojun Liao 已提交
1822 1823 1824 1825 1826 1827
    cur->mixBlock = true;
  }

  return endPos;
}

H
[td-32]  
hjxilinx 已提交
1828 1829
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
1830 1831
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
1832
  SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
H
Hongze Cheng 已提交
1833
  STsdbCfg*      pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
1834

1835
  initTableMemIterator(pTsdbReadHandle, pCheckInfo);
1836

1837 1838
  SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
  assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
dengyihao's avatar
dengyihao 已提交
1839
         cur->pos >= 0 && cur->pos < pBlock->numOfRows);
H
Haojun Liao 已提交
1840

1841
  TSKEY* tsArray = pCols->cols[0].pData;
dengyihao's avatar
dengyihao 已提交
1842 1843
  assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
         tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
1844 1845

  // for search the endPos, so the order needs to reverse
dengyihao's avatar
dengyihao 已提交
1846
  int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
1847

dengyihao's avatar
dengyihao 已提交
1848
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
1849
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
1850

H
Haojun Liao 已提交
1851
  STable* pTable = NULL;
1852
  int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
H
Haojun Liao 已提交
1853

H
Hongze Cheng 已提交
1854 1855
  tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
            " rows:%d, start:%d, end:%d, %s",
dengyihao's avatar
dengyihao 已提交
1856 1857
            pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
            cur->pos, endPos, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
1858

1859 1860
  // compared with the data from in-memory buffer, to generate the correct timestamp array list
  int32_t numOfRows = 0;
H
Haojun Liao 已提交
1861

dengyihao's avatar
dengyihao 已提交
1862 1863
  int16_t   rv1 = -1;
  int16_t   rv2 = -1;
1864 1865
  STSchema* pSchema1 = NULL;
  STSchema* pSchema2 = NULL;
D
fix bug  
dapan1121 已提交
1866

H
Haojun Liao 已提交
1867 1868
  int32_t pos = cur->pos;
  cur->win = TSWINDOW_INITIALIZER;
1869

1870 1871
  // no data in buffer, load data from file directly
  if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
1872
    copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &blockInfo, endPos);
1873
    return;
1874
  } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
1875 1876
    SSkipListNode* node = NULL;
    do {
H
Haojun Liao 已提交
1877 1878
      STSRow* row2 = NULL;
      STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
1879
      if (row1 == NULL) {
H
[td-32]  
hjxilinx 已提交
1880
        break;
1881
      }
1882

H
Haojun Liao 已提交
1883
      TSKEY key = TD_ROW_KEY(row1);
1884 1885
      if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1886 1887 1888
        break;
      }

dengyihao's avatar
dengyihao 已提交
1889 1890 1891 1892
      if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1893 1894 1895
        break;
      }

1896 1897
      if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
          (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
1898
        if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
1899
          //          pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1900
          rv1 = TD_ROW_SVER(row1);
C
Cary Xu 已提交
1901
        }
dengyihao's avatar
dengyihao 已提交
1902 1903
        if (row2 && rv2 != TD_ROW_SVER(row2)) {
          //          pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1904
          rv2 = TD_ROW_SVER(row2);
1905
        }
dengyihao's avatar
dengyihao 已提交
1906 1907 1908

        mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
                           pCheckInfo->tableId, pSchema1, pSchema2, true);
1909 1910 1911 1912
        numOfRows += 1;
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = key;
        }
1913

1914
        cur->win.ekey = key;
dengyihao's avatar
dengyihao 已提交
1915
        cur->lastKey = key + step;
1916 1917
        cur->mixBlock = true;

1918
        moveToNextRowInMem(pCheckInfo);
1919
      } else if (key == tsArray[pos]) {  // data in buffer has the same timestamp of data in file block, ignore it
H
TD-1439  
Hongze Cheng 已提交
1920
        if (pCfg->update) {
dengyihao's avatar
dengyihao 已提交
1921
          if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
1922
            doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
D
fix bug  
dapan1121 已提交
1923
          }
H
Haojun Liao 已提交
1924
          if (rv1 != TD_ROW_SVER(row1)) {
dengyihao's avatar
dengyihao 已提交
1925
            //            pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
H
Haojun Liao 已提交
1926
            rv1 = TD_ROW_SVER(row1);
1927
          }
dengyihao's avatar
dengyihao 已提交
1928 1929
          if (row2 && rv2 != TD_ROW_SVER(row2)) {
            //            pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
H
Haojun Liao 已提交
1930
            rv2 = TD_ROW_SVER(row2);
1931
          }
dengyihao's avatar
dengyihao 已提交
1932

1933
          bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
dengyihao's avatar
dengyihao 已提交
1934 1935
          mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
                             pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
H
TD-1439  
Hongze Cheng 已提交
1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
          numOfRows += 1;
          if (cur->win.skey == TSKEY_INITIAL_VAL) {
            cur->win.skey = key;
          }

          cur->win.ekey = key;
          cur->lastKey = key + step;
          cur->mixBlock = true;

          moveToNextRowInMem(pCheckInfo);
          pos += step;
        } else {
          moveToNextRowInMem(pCheckInfo);
        }
1950
      } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
dengyihao's avatar
dengyihao 已提交
1951
                 (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1952 1953 1954
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }
1955

1956
        int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
1957 1958
        assert(end != -1);

dengyihao's avatar
dengyihao 已提交
1959
        if (tsArray[end] == key) {  // the value of key in cache equals to the end timestamp value, ignore it
1960
          if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
H
Hongze Cheng 已提交
1961 1962 1963 1964
            moveToNextRowInMem(pCheckInfo);
          } else {
            end -= step;
          }
H
Haojun Liao 已提交
1965
        }
1966

1967
        int32_t qstart = 0, qend = 0;
1968
        getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
1969

1970
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
1971 1972
        pos += (qend - qstart + 1) * step;

dengyihao's avatar
dengyihao 已提交
1973 1974
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
        cur->lastKey = cur->win.ekey + step;
1975
      }
1976
    } while (numOfRows < pTsdbReadHandle->outputCapacity);
H
Haojun Liao 已提交
1977

1978
    if (numOfRows < pTsdbReadHandle->outputCapacity) {
H
Haojun Liao 已提交
1979 1980 1981 1982
      /**
       * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
       * copy them all to result buffer, since it may be overlapped with file data block.
       */
1983
      if (node == NULL ||
H
Haojun Liao 已提交
1984
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
1985
           ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
H
Haojun Liao 已提交
1986
          ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
1987
           !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
1988 1989 1990 1991 1992
        // no data in cache or data in cache is greater than the ekey of time window, load data from file block
        if (cur->win.skey == TSKEY_INITIAL_VAL) {
          cur->win.skey = tsArray[pos];
        }

1993
        int32_t start = -1, end = -1;
1994
        getQualifiedRowsPos(pTsdbReadHandle, pos, endPos, numOfRows, &start, &end);
1995

1996
        numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
1997
        pos += (end - start + 1) * step;
1998

dengyihao's avatar
dengyihao 已提交
1999 2000
        cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start];
        cur->lastKey = cur->win.ekey + step;
H
Haojun Liao 已提交
2001
        cur->mixBlock = true;
2002
      }
2003 2004
    }
  }
H
Haojun Liao 已提交
2005 2006

  cur->blockCompleted =
2007 2008
      (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
       ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
2009

2010
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
wafwerar's avatar
wafwerar 已提交
2011
    TSWAP(cur->win.skey, cur->win.ekey);
2012
  }
2013

2014 2015 2016
  moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
  updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
  doCheckGeneratedBlockRange(pTsdbReadHandle);
H
Haojun Liao 已提交
2017

dengyihao's avatar
dengyihao 已提交
2018 2019 2020
  tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
            pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
            pTsdbReadHandle->idStr);
2021 2022
}

2023
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
H
[td-32]  
hjxilinx 已提交
2024
  int    firstPos, lastPos, midPos = -1;
H
Haojun Liao 已提交
2025
  int    numOfRows;
2026 2027
  TSKEY* keyList;

H
[td-32]  
hjxilinx 已提交
2028
  if (num <= 0) return -1;
2029 2030

  keyList = (TSKEY*)pValue;
H
[td-32]  
hjxilinx 已提交
2031 2032
  firstPos = 0;
  lastPos = num - 1;
2033

2034
  if (order == TSDB_ORDER_DESC) {
H
[td-32]  
hjxilinx 已提交
2035 2036 2037 2038 2039
    // find the first position which is smaller than the key
    while (1) {
      if (key >= keyList[lastPos]) return lastPos;
      if (key == keyList[firstPos]) return firstPos;
      if (key < keyList[firstPos]) return firstPos - 1;
2040

H
Haojun Liao 已提交
2041 2042
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2043

H
[td-32]  
hjxilinx 已提交
2044 2045 2046 2047 2048 2049 2050 2051
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
2052

H
[td-32]  
hjxilinx 已提交
2053 2054 2055 2056 2057
  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;
2058

H
[td-32]  
hjxilinx 已提交
2059 2060 2061 2062 2063 2064 2065
      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }
2066

H
Haojun Liao 已提交
2067 2068
      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;
2069

H
[td-32]  
hjxilinx 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078
      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }
2079

H
[td-32]  
hjxilinx 已提交
2080 2081 2082
  return midPos;
}

2083
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
wafwerar's avatar
wafwerar 已提交
2084 2085
  taosMemoryFreeClear(pSupporter->numOfBlocksPerTable);
  taosMemoryFreeClear(pSupporter->blockIndexArray);
2086 2087

  for (int32_t i = 0; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2088
    STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
wafwerar's avatar
wafwerar 已提交
2089
    taosMemoryFreeClear(pBlockInfo);
2090 2091
  }

wafwerar's avatar
wafwerar 已提交
2092
  taosMemoryFreeClear(pSupporter->pDataBlockInfo);
2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103
}

static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
  int32_t leftTableIndex = *(int32_t*)pLeft;
  int32_t rightTableIndex = *(int32_t*)pRight;

  SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;

  int32_t leftTableBlockIndex = pSupporter->blockIndexArray[leftTableIndex];
  int32_t rightTableBlockIndex = pSupporter->blockIndexArray[rightTableIndex];

2104
  if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftTableIndex]) {
2105 2106
    /* left block is empty */
    return 1;
2107
  } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightTableIndex]) {
2108 2109 2110 2111 2112 2113 2114
    /* right block is empty */
    return -1;
  }

  STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex];
  STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex];

H
Haojun Liao 已提交
2115
  //    assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
dengyihao's avatar
dengyihao 已提交
2116
#if 0  // TODO: temporarily comment off requested by Dr. Liao
H
Haojun Liao 已提交
2117 2118
  if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
      pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
B
Bomin Zhang 已提交
2119
    tsdbError("error in header file, two block with same offset:%" PRId64, (int64_t)pLeftBlockInfoEx->compBlock->offset);
2120
  }
H
Haojun Liao 已提交
2121
#endif
2122

H
Haojun Liao 已提交
2123
  return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
2124 2125
}

2126
static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
H
Haojun Liao 已提交
2127 2128
  size_t size = sizeof(STableBlockInfo) * numOfBlocks;

2129 2130
  if (pTsdbReadHandle->allocSize < size) {
    pTsdbReadHandle->allocSize = (int32_t)size;
wafwerar's avatar
wafwerar 已提交
2131
    char* tmp = taosMemoryRealloc(pTsdbReadHandle->pDataBlockInfo, pTsdbReadHandle->allocSize);
H
Haojun Liao 已提交
2132 2133 2134 2135
    if (tmp == NULL) {
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
    }

dengyihao's avatar
dengyihao 已提交
2136
    pTsdbReadHandle->pDataBlockInfo = (STableBlockInfo*)tmp;
2137 2138
  }

2139
  memset(pTsdbReadHandle->pDataBlockInfo, 0, size);
2140 2141
  *numOfAllocBlocks = numOfBlocks;

H
Haojun Liao 已提交
2142
  // access data blocks according to the offset of each block in asc/desc order.
2143
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2144

2145 2146
  SBlockOrderSupporter sup = {0};
  sup.numOfTables = numOfTables;
wafwerar's avatar
wafwerar 已提交
2147 2148 2149
  sup.numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.blockIndexArray = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
  sup.pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
2150

2151
  if (sup.numOfBlocksPerTable == NULL || sup.blockIndexArray == NULL || sup.pDataBlockInfo == NULL) {
2152
    cleanBlockOrderSupporter(&sup, 0);
2153
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2154
  }
H
Haojun Liao 已提交
2155

2156
  int32_t cnt = 0;
2157
  int32_t numOfQualTables = 0;
H
Haojun Liao 已提交
2158

2159
  for (int32_t j = 0; j < numOfTables; ++j) {
2160
    STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, j);
2161 2162 2163
    if (pTableCheck->numOfBlocks <= 0) {
      continue;
    }
H
Haojun Liao 已提交
2164

H
refact  
Hongze Cheng 已提交
2165
    SBlock* pBlock = pTableCheck->pCompInfo->blocks;
2166
    sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
2167

wafwerar's avatar
wafwerar 已提交
2168
    char* buf = taosMemoryMalloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
2169
    if (buf == NULL) {
2170
      cleanBlockOrderSupporter(&sup, numOfQualTables);
2171
      return TSDB_CODE_TDB_OUT_OF_MEMORY;
2172 2173
    }

2174
    sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
2175 2176

    for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
H
Haojun Liao 已提交
2177
      STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k];
2178

H
Haojun Liao 已提交
2179 2180
      pBlockInfo->compBlock = &pBlock[k];
      pBlockInfo->pTableCheckInfo = pTableCheck;
2181 2182 2183
      cnt++;
    }

2184
    numOfQualTables++;
2185 2186
  }

H
Haojun Liao 已提交
2187
  assert(numOfBlocks == cnt);
2188

H
Haojun Liao 已提交
2189 2190
  // since there is only one table qualified, blocks are not sorted
  if (numOfQualTables == 1) {
2191
    memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks);
H
Haojun Liao 已提交
2192
    cleanBlockOrderSupporter(&sup, numOfQualTables);
2193

H
Haojun Liao 已提交
2194
    tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2195
              pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2196 2197
    return TSDB_CODE_SUCCESS;
  }
2198

H
Haojun Liao 已提交
2199
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt,
dengyihao's avatar
dengyihao 已提交
2200
            numOfQualTables, pTsdbReadHandle->idStr);
2201

2202
  assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables);  // the pTableQueryInfo[j]->numOfBlocks may be 0
2203
  sup.numOfTables = numOfQualTables;
2204

2205
  SMultiwayMergeTreeInfo* pTree = NULL;
dengyihao's avatar
dengyihao 已提交
2206
  uint8_t                 ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
2207 2208
  if (ret != TSDB_CODE_SUCCESS) {
    cleanBlockOrderSupporter(&sup, numOfTables);
2209
    return TSDB_CODE_TDB_OUT_OF_MEMORY;
2210 2211 2212 2213 2214
  }

  int32_t numOfTotal = 0;

  while (numOfTotal < cnt) {
2215
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
2216 2217
    int32_t index = sup.blockIndexArray[pos]++;

H
Haojun Liao 已提交
2218
    STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos];
2219
    pTsdbReadHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index];
2220 2221

    // set data block index overflow, in order to disable the offset comparator
2222 2223
    if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) {
      sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1;
2224
    }
2225

H
Haojun Liao 已提交
2226
    tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
2227 2228 2229 2230 2231
  }

  /*
   * available when no import exists
   * for(int32_t i = 0; i < cnt - 1; ++i) {
H
Haojun Liao 已提交
2232
   *   assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset);
2233 2234 2235
   * }
   */

H
Haojun Liao 已提交
2236
  tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr);
2237
  cleanBlockOrderSupporter(&sup, numOfTables);
wafwerar's avatar
wafwerar 已提交
2238
  taosMemoryFree(pTree);
2239 2240 2241 2242

  return TSDB_CODE_SUCCESS;
}

2243
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
H
Haojun Liao 已提交
2244

dengyihao's avatar
dengyihao 已提交
2245 2246
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
  int32_t        step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2247
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2248

dengyihao's avatar
dengyihao 已提交
2249
  while (1) {
2250
    int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
H
Haojun Liao 已提交
2251 2252 2253 2254
    if (code != TSDB_CODE_SUCCESS || *exists) {
      return code;
    }

2255 2256
    if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
H
Haojun Liao 已提交
2257
      // all data blocks in current file has been checked already, try next file if exists
2258
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2259 2260 2261 2262
    } else {  // next block of the same file
      cur->slot += step;
      cur->mixBlock = false;
      cur->blockCompleted = false;
2263
      pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
H
Haojun Liao 已提交
2264 2265 2266 2267
    }
  }
}

2268 2269 2270
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  pTsdbReadHandle->numOfBlocks = 0;
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
2271 2272 2273

  int32_t code = TSDB_CODE_SUCCESS;

2274
  int32_t numOfBlocks = 0;
2275
  int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
2276

H
Hongze Cheng 已提交
2277
  STsdbCfg*   pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
2278 2279
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Hongze Cheng 已提交
2280
  while (true) {
2281
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2282

2283 2284
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2285 2286 2287
      break;
    }

H
refact  
Hongze Cheng 已提交
2288
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
2289 2290

    // current file are not overlapped with query time window, ignore remain files
2291 2292 2293
    if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2294 2295
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2296 2297
      pTsdbReadHandle->pFileGroup = NULL;
      assert(pTsdbReadHandle->numOfBlocks == 0);
2298 2299 2300
      break;
    }

2301 2302
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2303 2304 2305 2306
      code = terrno;
      break;
    }

2307
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Hongze Cheng 已提交
2308

2309
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Hongze Cheng 已提交
2310 2311 2312 2313
      code = terrno;
      break;
    }

2314
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
2315 2316
      break;
    }
H
Haojun Liao 已提交
2317

H
Haojun Liao 已提交
2318 2319
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2320

2321 2322 2323 2324
    assert(numOfBlocks >= 0);
    if (numOfBlocks == 0) {
      continue;
    }
H
Haojun Liao 已提交
2325

2326
    // todo return error code to query engine
dengyihao's avatar
dengyihao 已提交
2327 2328
    if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
        TSDB_CODE_SUCCESS) {
2329 2330
      break;
    }
H
Haojun Liao 已提交
2331

2332 2333
    assert(numOfBlocks >= pTsdbReadHandle->numOfBlocks);
    if (pTsdbReadHandle->numOfBlocks > 0) {
2334 2335 2336
      break;
    }
  }
H
Haojun Liao 已提交
2337

2338
  // no data in file anymore
2339
  if (pTsdbReadHandle->numOfBlocks <= 0 || code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2340
    if (code == TSDB_CODE_SUCCESS) {
2341
      assert(pTsdbReadHandle->pFileGroup == NULL);
H
Haojun Liao 已提交
2342 2343
    }

D
dapan1121 已提交
2344
    cur->fid = INT32_MIN;  // denote that there are no data in file anymore
H
Haojun Liao 已提交
2345 2346
    *exists = false;
    return code;
2347
  }
H
Haojun Liao 已提交
2348

2349
  assert(pTsdbReadHandle->pFileGroup != NULL && pTsdbReadHandle->numOfBlocks > 0);
dengyihao's avatar
dengyihao 已提交
2350
  cur->slot = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 0 : pTsdbReadHandle->numOfBlocks - 1;
2351
  cur->fid = pTsdbReadHandle->pFileGroup->fid;
H
Haojun Liao 已提交
2352

2353 2354
  STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
  return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
H
Haojun Liao 已提交
2355 2356 2357 2358 2359 2360 2361
}

static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
  assert(cur != NULL && numOfBlocks > 0);
  return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
}

2362
static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2363
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
H
Haojun Liao 已提交
2364

2365 2366
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
  assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
H
Haojun Liao 已提交
2367 2368

  cur->slot += step;
dengyihao's avatar
dengyihao 已提交
2369
  cur->mixBlock = false;
H
Haojun Liao 已提交
2370
  cur->blockCompleted = false;
2371
}
H
Haojun Liao 已提交
2372 2373

int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
dengyihao's avatar
dengyihao 已提交
2374
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
H
Haojun Liao 已提交
2375

H
Haojun Liao 已提交
2376
  pTableBlockInfo->totalSize = 0;
2377
  pTableBlockInfo->totalRows = 0;
H
Haojun Liao 已提交
2378

2379
  STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2380 2381

  // find the start data block in file
2382
  pTsdbReadHandle->locateStart = true;
H
Hongze Cheng 已提交
2383
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
refact  
Hongze Cheng 已提交
2384
  int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2385 2386

  tsdbRLockFS(pFileHandle);
2387 2388
  tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
  tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Haojun Liao 已提交
2389 2390
  tsdbUnLockFS(pFileHandle);

H
Haojun Liao 已提交
2391
  pTableBlockInfo->numOfFiles += 1;
H
Haojun Liao 已提交
2392

H
Haojun Liao 已提交
2393
  int32_t     code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2394
  int32_t     numOfBlocks = 0;
2395
  int32_t     numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2396
  int         defaultRows = 4096;  // TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
H
Haojun Liao 已提交
2397 2398
  STimeWindow win = TSWINDOW_INITIALIZER;

H
Haojun Liao 已提交
2399 2400
  bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);

H
Haojun Liao 已提交
2401 2402
  while (true) {
    numOfBlocks = 0;
2403
    tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2404

2405 2406
    if ((pTsdbReadHandle->pFileGroup = tsdbFSIterNext(&pTsdbReadHandle->fileIter)) == NULL) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2407 2408 2409
      break;
    }

H
refact  
Hongze Cheng 已提交
2410
    tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
H
Haojun Liao 已提交
2411 2412

    // current file are not overlapped with query time window, ignore remain files
dengyihao's avatar
dengyihao 已提交
2413 2414
    if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
        (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
2415
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2416 2417
      tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
                pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
2418
      pTsdbReadHandle->pFileGroup = NULL;
H
Haojun Liao 已提交
2419 2420 2421
      break;
    }

H
Haojun Liao 已提交
2422
    pTableBlockInfo->numOfFiles += 1;
2423 2424
    if (tsdbSetAndOpenReadFSet(&pTsdbReadHandle->rhelper, pTsdbReadHandle->pFileGroup) < 0) {
      tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2425 2426 2427 2428
      code = terrno;
      break;
    }

2429
    tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
H
Haojun Liao 已提交
2430

2431
    if (tsdbLoadBlockIdx(&pTsdbReadHandle->rhelper) < 0) {
H
Haojun Liao 已提交
2432 2433 2434 2435
      code = terrno;
      break;
    }

2436
    if ((code = getFileCompInfo(pTsdbReadHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2437 2438 2439
      break;
    }

H
Haojun Liao 已提交
2440 2441
    tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables,
              pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2442 2443 2444 2445 2446 2447

    if (numOfBlocks == 0) {
      continue;
    }

    for (int32_t i = 0; i < numOfTables; ++i) {
2448
      STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2449 2450 2451

      SBlock* pBlock = pCheckInfo->pCompInfo->blocks;
      for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) {
H
Haojun Liao 已提交
2452
        pTableBlockInfo->totalSize += pBlock[j].len;
H
Haojun Liao 已提交
2453

H
Haojun Liao 已提交
2454
        int32_t numOfRows = pBlock[j].numOfRows;
2455
        pTableBlockInfo->totalRows += numOfRows;
H
Haojun Liao 已提交
2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466
        if (numOfRows > pTableBlockInfo->maxRows) {
          pTableBlockInfo->maxRows = numOfRows;
        }

        if (numOfRows < pTableBlockInfo->minRows) {
          pTableBlockInfo->minRows = numOfRows;
        }

        if (numOfRows < defaultRows) {
          pTableBlockInfo->numOfSmallBlocks += 1;
        }
dengyihao's avatar
dengyihao 已提交
2467 2468 2469
        //        int32_t  stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
        //        SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
        //        blockInfo->numBlocksOfStep++;
H
Haojun Liao 已提交
2470 2471 2472 2473 2474 2475 2476
      }
    }
  }

  return code;
}

2477 2478 2479
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
  STsdbFS*       pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2480 2481

  // find the start data block in file
2482 2483
  if (!pTsdbReadHandle->locateStart) {
    pTsdbReadHandle->locateStart = true;
H
Hongze Cheng 已提交
2484
    STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
refact  
Hongze Cheng 已提交
2485
    int32_t   fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
H
Haojun Liao 已提交
2486

H
Hongze Cheng 已提交
2487
    tsdbRLockFS(pFileHandle);
2488 2489
    tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
    tsdbFSIterSeek(&pTsdbReadHandle->fileIter, fid);
H
Hongze Cheng 已提交
2490
    tsdbUnLockFS(pFileHandle);
2491

2492
    return getFirstFileDataBlock(pTsdbReadHandle, exists);
2493
  } else {
2494
    // check if current file block is all consumed
2495
    STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
2496
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
H
Haojun Liao 已提交
2497

2498
    // current block is done, try next
H
Haojun Liao 已提交
2499
    if ((!cur->mixBlock) || cur->blockCompleted) {
H
Haojun Liao 已提交
2500
      // all data blocks in current file has been checked already, try next file if exists
2501
    } else {
H
Haojun Liao 已提交
2502 2503
      tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos,
                pTsdbReadHandle->idStr);
2504 2505
      int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo);
      *exists = (pTsdbReadHandle->realNumOfRows > 0);
H
Haojun Liao 已提交
2506

H
Haojun Liao 已提交
2507 2508 2509 2510 2511 2512 2513
      if (code != TSDB_CODE_SUCCESS || *exists) {
        return code;
      }
    }

    // current block is empty, try next block in file
    // all data blocks in current file has been checked already, try next file if exists
2514 2515
    if (isEndFileDataBlock(cur, pTsdbReadHandle->numOfBlocks, ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      return getFirstFileDataBlock(pTsdbReadHandle, exists);
H
Haojun Liao 已提交
2516
    } else {
2517 2518 2519
      moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
      STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
      return getDataBlockRv(pTsdbReadHandle, pNext, exists);
2520 2521
    }
  }
2522 2523
}

2524 2525
static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
dengyihao's avatar
dengyihao 已提交
2526

2527 2528
  while (pTsdbReadHandle->activeIndex < numOfTables) {
    if (hasMoreDataInCache(pTsdbReadHandle)) {
2529 2530
      return true;
    }
H
Haojun Liao 已提交
2531

2532
    pTsdbReadHandle->activeIndex += 1;
2533
  }
H
Haojun Liao 已提交
2534

2535 2536 2537
  return false;
}

dengyihao's avatar
dengyihao 已提交
2538
// todo not unref yet, since it is not support multi-group interpolation query
H
Haojun Liao 已提交
2539
static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
H
Haojun Liao 已提交
2540
  // filter the queried time stamp in the first place
dengyihao's avatar
dengyihao 已提交
2541
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
H
Haojun Liao 已提交
2542 2543

  // starts from the buffer in case of descending timestamp order check data blocks
2544
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2545 2546

  int32_t i = 0;
dengyihao's avatar
dengyihao 已提交
2547
  while (i < numOfTables) {
2548
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
H
Haojun Liao 已提交
2549 2550

    // the first qualified table for interpolation query
dengyihao's avatar
dengyihao 已提交
2551 2552 2553 2554
    //    if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
    //        (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
    //      break;
    //    }
H
Haojun Liao 已提交
2555 2556 2557 2558 2559 2560 2561 2562 2563

    i++;
  }

  // there are no data in all the tables
  if (i == numOfTables) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2564
  STableCheckInfo info = *(STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
2565
  taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2566

2567 2568
  info.lastKey = pTsdbReadHandle->window.skey;
  taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
H
Haojun Liao 已提交
2569 2570 2571
}

static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
2572
                                 STsdbReadHandle* pTsdbReadHandle) {
dengyihao's avatar
dengyihao 已提交
2573 2574
  int       numOfRows = 0;
  int32_t   numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
H
Hongze Cheng 已提交
2575
  STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
H
Haojun Liao 已提交
2576 2577
  win->skey = TSKEY_INITIAL_VAL;

dengyihao's avatar
dengyihao 已提交
2578 2579
  int64_t   st = taosGetTimestampUs();
  int16_t   rv = -1;
D
fix bug  
dapan1121 已提交
2580
  STSchema* pSchema = NULL;
H
Haojun Liao 已提交
2581 2582

  do {
H
Haojun Liao 已提交
2583
    STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
H
Haojun Liao 已提交
2584 2585 2586 2587
    if (row == NULL) {
      break;
    }

H
Haojun Liao 已提交
2588
    TSKEY key = TD_ROW_KEY(row);
dengyihao's avatar
dengyihao 已提交
2589 2590 2591 2592
    if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
        (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
      tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
                key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
H
Haojun Liao 已提交
2593 2594 2595 2596 2597 2598 2599 2600 2601

      break;
    }

    if (win->skey == INT64_MIN) {
      win->skey = key;
    }

    win->ekey = key;
H
Haojun Liao 已提交
2602
    if (rv != TD_ROW_SVER(row)) {
H
Hongze Cheng 已提交
2603
      pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
H
Haojun Liao 已提交
2604
      rv = TD_ROW_SVER(row);
D
fix bug  
dapan1121 已提交
2605
    }
dengyihao's avatar
dengyihao 已提交
2606 2607
    mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
                       NULL, true);
H
Haojun Liao 已提交
2608 2609 2610 2611 2612 2613

    if (++numOfRows >= maxRowsToRead) {
      moveToNextRowInMem(pCheckInfo);
      break;
    }

dengyihao's avatar
dengyihao 已提交
2614
  } while (moveToNextRowInMem(pCheckInfo));
H
Haojun Liao 已提交
2615 2616 2617 2618

  assert(numOfRows <= maxRowsToRead);

  // if the buffer is not full in case of descending order query, move the data in the front of the buffer
2619
  if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && numOfRows < maxRowsToRead) {
H
Haojun Liao 已提交
2620 2621
    int32_t emptySize = maxRowsToRead - numOfRows;

dengyihao's avatar
dengyihao 已提交
2622
    for (int32_t i = 0; i < numOfCols; ++i) {
2623
      SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
dengyihao's avatar
dengyihao 已提交
2624 2625
      memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
              numOfRows * pColInfo->info.bytes);
H
Haojun Liao 已提交
2626 2627 2628 2629
    }
  }

  int64_t elapsedTime = taosGetTimestampUs() - st;
dengyihao's avatar
dengyihao 已提交
2630 2631
  tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
            pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
2632 2633 2634 2635

  return numOfRows;
}

2636 2637
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
  SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
H
Haojun Liao 已提交
2638

2639 2640 2641 2642 2643
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Haojun Liao 已提交
2644

2645
    STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
H
Haojun Liao 已提交
2646 2647 2648
    taosArrayPush(list, &info);
  }

2649
  metaCloseCtbCurosr(pCur);
H
Haojun Liao 已提交
2650 2651 2652 2653 2654 2655 2656 2657
  return TSDB_CODE_SUCCESS;
}

static void destroyHelper(void* param) {
  if (param == NULL) {
    return;
  }

dengyihao's avatar
dengyihao 已提交
2658 2659 2660 2661 2662 2663
  //  tQueryInfo* pInfo = (tQueryInfo*)param;
  //  if (pInfo->optr != TSDB_RELATION_IN) {
  //    taosMemoryFreeClear(pInfo->q);
  //  } else {
  //    taosHashCleanup((SHashObj *)(pInfo->q));
  //  }
H
Haojun Liao 已提交
2664

wafwerar's avatar
wafwerar 已提交
2665
  taosMemoryFree(param);
H
Haojun Liao 已提交
2666 2667
}

dengyihao's avatar
dengyihao 已提交
2668 2669
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
2670

dengyihao's avatar
dengyihao 已提交
2671
static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
2672
  if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2673 2674
    // check if the query range overlaps with the file data block
    bool exists = true;
H
Haojun Liao 已提交
2675

2676
    int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2677
    if (code != TSDB_CODE_SUCCESS) {
2678
      pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2679 2680
      return false;
    }
H
Haojun Liao 已提交
2681

H
Haojun Liao 已提交
2682
    if (exists) {
dengyihao's avatar
dengyihao 已提交
2683
      tsdbRetrieveDataBlock((tsdbReaderT*)pTsdbReadHandle, NULL);
2684 2685 2686
      if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey) {
        SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, 0);
        assert(*(int64_t*)pColInfo->pData == pTsdbReadHandle->window.skey);
H
Haojun Liao 已提交
2687 2688
      }

dengyihao's avatar
dengyihao 已提交
2689
      pTsdbReadHandle->currentLoadExternalRows = false;  // clear the flag, since the exact matched row is found.
H
Haojun Liao 已提交
2690 2691
      return exists;
    }
H
Haojun Liao 已提交
2692

2693
    pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2694
  }
H
Haojun Liao 已提交
2695

2696 2697
  if (hasMoreDataInCache(pTsdbReadHandle)) {
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2698 2699
    return true;
  }
H
Haojun Liao 已提交
2700

H
Haojun Liao 已提交
2701
  // current result is empty
dengyihao's avatar
dengyihao 已提交
2702 2703 2704
  if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
      pTsdbReadHandle->cur.rows == 0) {
    //    STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
H
Haojun Liao 已提交
2705

dengyihao's avatar
dengyihao 已提交
2706 2707
    //    doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
    //    doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef);
H
Haojun Liao 已提交
2708

2709
    bool result = tsdbGetExternalRow(pTsdbReadHandle);
H
Haojun Liao 已提交
2710

dengyihao's avatar
dengyihao 已提交
2711 2712
    //    pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
    //    pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
2713
    pTsdbReadHandle->currentLoadExternalRows = false;
H
Haojun Liao 已提交
2714 2715

    return result;
2716
  }
H
Haojun Liao 已提交
2717

H
Haojun Liao 已提交
2718 2719
  return false;
}
2720

2721
static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
H
Haojun Liao 已提交
2722
  // the last row is cached in buffer, return it directly.
2723
  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
dengyihao's avatar
dengyihao 已提交
2724
  int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
2725
  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2726 2727
  assert(numOfTables > 0 && numOfCols > 0);

2728
  SQueryFilePos* cur = &pTsdbReadHandle->cur;
2729

dengyihao's avatar
dengyihao 已提交
2730 2731 2732
  STSRow* pRow = NULL;
  TSKEY   key = TSKEY_INITIAL_VAL;
  int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
2733 2734 2735

  if (++pTsdbReadHandle->activeIndex < numOfTables) {
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
dengyihao's avatar
dengyihao 已提交
2736 2737 2738 2739 2740 2741
    //    int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
    //    if (ret != TSDB_CODE_SUCCESS) {
    //      return false;
    //    }
    mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId,
                       NULL, NULL, true);
wafwerar's avatar
wafwerar 已提交
2742
    taosMemoryFreeClear(pRow);
H
Haojun Liao 已提交
2743

H
Haojun Liao 已提交
2744 2745 2746
    // update the last key value
    pCheckInfo->lastKey = key + step;

dengyihao's avatar
dengyihao 已提交
2747 2748
    cur->rows = 1;  // only one row
    cur->lastKey = key + step;
H
Haojun Liao 已提交
2749 2750 2751 2752 2753
    cur->mixBlock = true;
    cur->win.skey = key;
    cur->win.ekey = key;

    return true;
2754
  }
H
Haojun Liao 已提交
2755

H
Haojun Liao 已提交
2756 2757 2758
  return false;
}

dengyihao's avatar
dengyihao 已提交
2759
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778
//  // the last row is cached in buffer, return it directly.
//  // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
//  int32_t tgNumOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//  size_t  numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
//  int32_t numOfRows = 0;
//  assert(numOfTables > 0 && tgNumOfCols > 0);
//  SQueryFilePos* cur = &pTsdbReadHandle->cur;
//  TSKEY priKey = TSKEY_INITIAL_VAL;
//  int32_t priIdx = -1;
//  SColumnInfoData* pColInfo = NULL;
//
//  while (++pTsdbReadHandle->activeIndex < numOfTables) {
//    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//    STable* pTable = pCheckInfo->pTableObj;
//    char* pData = NULL;
//
//    int32_t numOfCols = pTable->maxColNum;
//
//    if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
dengyihao's avatar
dengyihao 已提交
2779 2780
//      tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
//      pTable->tableId); continue;

//    }
//
//    int32_t i = 0, j = 0;
//    while(i < tgNumOfCols && j < numOfCols) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
//      if (pTable->lastCols[j].colId < pColInfo->info.colId) {
//        j++;
//        continue;
//      } else if (pTable->lastCols[j].colId > pColInfo->info.colId) {
//        i++;
//        continue;
//      }
//
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      if (pTable->lastCols[j].bytes > 0) {
//        void* value = pTable->lastCols[j].pData;
//        switch (pColInfo->info.type) {
//          case TSDB_DATA_TYPE_BINARY:
//          case TSDB_DATA_TYPE_NCHAR:
//            memcpy(pData, value, varDataTLen(value));
//            break;
//          case TSDB_DATA_TYPE_NULL:
//          case TSDB_DATA_TYPE_BOOL:
//          case TSDB_DATA_TYPE_TINYINT:
//          case TSDB_DATA_TYPE_UTINYINT:
//            *(uint8_t *)pData = *(uint8_t *)value;
//            break;
//          case TSDB_DATA_TYPE_SMALLINT:
//          case TSDB_DATA_TYPE_USMALLINT:
//            *(uint16_t *)pData = *(uint16_t *)value;
//            break;
//          case TSDB_DATA_TYPE_INT:
//          case TSDB_DATA_TYPE_UINT:
//            *(uint32_t *)pData = *(uint32_t *)value;
//            break;
//          case TSDB_DATA_TYPE_BIGINT:
//          case TSDB_DATA_TYPE_UBIGINT:
//            *(uint64_t *)pData = *(uint64_t *)value;
//            break;
//          case TSDB_DATA_TYPE_FLOAT:
//            SET_FLOAT_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_DOUBLE:
//            SET_DOUBLE_PTR(pData, value);
//            break;
//          case TSDB_DATA_TYPE_TIMESTAMP:
//            if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
//              priKey = tdGetKey(*(TKEY *)value);
//              priIdx = i;
//
//              i++;
//              j++;
//              continue;
//            } else {
//              *(TSKEY *)pData = *(TSKEY *)value;
//            }
//            break;
//          default:
//            memcpy(pData, value, pColInfo->info.bytes);
//        }
//
//        for (int32_t n = 0; n < tgNumOfCols; ++n) {
//          if (n == i) {
//            continue;
//          }
//
//          pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//          pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//          if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
////            *(TSKEY *)pData = pTable->lastCols[j].ts;
//            continue;
//          }
//
//          if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//            setVardataNull(pData, pColInfo->info.type);
//          } else {
//            setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//          }
//        }
//
//        numOfRows++;
//        assert(numOfRows < pTsdbReadHandle->outputCapacity);
//      }
//
//      i++;
//      j++;
//    }
//
//    // leave the real ts column as the last row, because last function only (not stable) use the last row as res
//    if (priKey != TSKEY_INITIAL_VAL) {
//      pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, priIdx);
//      pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
//
//      *(TSKEY *)pData = priKey;
//
//      for (int32_t n = 0; n < tgNumOfCols; ++n) {
//        if (n == priIdx) {
//          continue;
//        }
//
//        pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, n);
//        pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;;
//
//        assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_ID);
//
//        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
//          setVardataNull(pData, pColInfo->info.type);
//        } else {
//          setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
//        }
//      }
//
//      numOfRows++;
//    }
//
//    if (numOfRows > 0) {
//      cur->rows     = numOfRows;
//      cur->mixBlock = true;
//
//      return true;
//    }
//  }
//
//  return false;
//}

static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
  size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
H
Haojun Liao 已提交
2911 2912 2913
  assert(numOfTables > 0);

  int64_t stime = taosGetTimestampUs();
H
Haojun Liao 已提交
2914

dengyihao's avatar
dengyihao 已提交
2915
  while (pTsdbReadHandle->activeIndex < numOfTables) {
2916
    if (loadBlockOfActiveTable(pTsdbReadHandle)) {
H
Haojun Liao 已提交
2917 2918 2919
      return true;
    }

2920
    STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
H
Haojun Liao 已提交
2921 2922
    pCheckInfo->numOfBlocks = 0;

2923 2924
    pTsdbReadHandle->activeIndex += 1;
    pTsdbReadHandle->locateStart = false;
dengyihao's avatar
dengyihao 已提交
2925 2926
    pTsdbReadHandle->checkFiles = true;
    pTsdbReadHandle->cur.rows = 0;
2927
    pTsdbReadHandle->currentLoadExternalRows = pTsdbReadHandle->loadExternalRow;
H
Haojun Liao 已提交
2928 2929 2930 2931

    terrno = TSDB_CODE_SUCCESS;

    int64_t elapsedTime = taosGetTimestampUs() - stime;
2932
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2933 2934 2935
  }

  return false;
2936 2937
}

H
Haojun Liao 已提交
2938
// handle data in cache situation
H
Haojun Liao 已提交
2939
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
2940
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
Y
yihaoDeng 已提交
2941

dengyihao's avatar
dengyihao 已提交
2942
  for (int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
2943 2944 2945 2946
    SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
    colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
  }

2947
  if (emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
2948 2949
    tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
              pTsdbReadHandle->idStr);
2950 2951 2952
    return false;
  }

Y
yihaoDeng 已提交
2953 2954 2955
  int64_t stime = taosGetTimestampUs();
  int64_t elapsedTime = stime;

2956
  // TODO refactor: remove "type"
2957 2958
  if (pTsdbReadHandle->type == TSDB_QUERY_TYPE_LAST) {
    if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
dengyihao's avatar
dengyihao 已提交
2959
      //      return loadCachedLastRow(pTsdbReadHandle);
2960
    } else if (pTsdbReadHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
dengyihao's avatar
dengyihao 已提交
2961
      //      return loadCachedLast(pTsdbReadHandle);
D
init  
dapan1121 已提交
2962
    }
H
Haojun Liao 已提交
2963
  }
Y
yihaoDeng 已提交
2964

2965 2966
  if (pTsdbReadHandle->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
    return loadDataBlockFromTableSeq(pTsdbReadHandle);
dengyihao's avatar
dengyihao 已提交
2967
  } else {  // loadType == RR and Offset Order
2968
    if (pTsdbReadHandle->checkFiles) {
H
Haojun Liao 已提交
2969 2970 2971
      // check if the query range overlaps with the file data block
      bool exists = true;

2972
      int32_t code = getDataBlocksInFiles(pTsdbReadHandle, &exists);
H
Haojun Liao 已提交
2973
      if (code != TSDB_CODE_SUCCESS) {
2974 2975
        pTsdbReadHandle->activeIndex = 0;
        pTsdbReadHandle->checkFiles = false;
H
Haojun Liao 已提交
2976 2977 2978 2979 2980

        return false;
      }

      if (exists) {
2981
        pTsdbReadHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
H
Haojun Liao 已提交
2982 2983
        return exists;
      }
Y
yihaoDeng 已提交
2984

2985 2986
      pTsdbReadHandle->activeIndex = 0;
      pTsdbReadHandle->checkFiles = false;
Y
yihaoDeng 已提交
2987 2988
    }

H
Haojun Liao 已提交
2989
    // TODO: opt by consider the scan order
2990
    bool ret = doHasDataInBuffer(pTsdbReadHandle);
H
Haojun Liao 已提交
2991
    terrno = TSDB_CODE_SUCCESS;
Y
yihaoDeng 已提交
2992

H
Haojun Liao 已提交
2993
    elapsedTime = taosGetTimestampUs() - stime;
2994
    pTsdbReadHandle->cost.checkForNextTime += elapsedTime;
H
Haojun Liao 已提交
2995
    return ret;
Y
yihaoDeng 已提交
2996 2997
  }
}
2998

dengyihao's avatar
dengyihao 已提交
2999
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) {
3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033
//  STsdbReadHandle* pSecQueryHandle = NULL;
//
//  if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  if (type == TSDB_NEXT_ROW && pTsdbReadHandle->next) {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // prepare the structure
//  int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pTsdbReadHandle);
//
//  if (type == TSDB_PREV_ROW) {
//    pTsdbReadHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->prev == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  } else {
//    pTsdbReadHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
//    if (pTsdbReadHandle->next == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//  }
//
//  SArray* row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev : pTsdbReadHandle->next;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(pTsdbReadHandle->pColumns, i);
//
//    SColumnInfoData colInfo = {{0}, 0};
//    colInfo.info = pCol->info;
wafwerar's avatar
wafwerar 已提交
3034
//    colInfo.pData = taosMemoryCalloc(1, pCol->info.bytes);
3035 3036 3037 3038 3039 3040 3041 3042 3043
//    if (colInfo.pData == NULL) {
//      terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//      goto out_of_memory;
//    }
//
//    taosArrayPush(row, &colInfo);
//  }
//
//  // load the previous row
3044
//  SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
3045 3046 3047 3048 3049 3050 3051 3052
//  if (type == TSDB_PREV_ROW) {
//    cond.order = TSDB_ORDER_DESC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
//  } else {
//    cond.order = TSDB_ORDER_ASC;
//    cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MAX};
//  }
//
wafwerar's avatar
wafwerar 已提交
3053
//  cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
3054 3055 3056 3057 3058 3059 3060 3061 3062 3063
//  if (cond.colList == NULL) {
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//  for (int32_t i = 0; i < cond.numOfCols; ++i) {
//    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
//    memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
//  }
//
H
Haojun Liao 已提交
3064
//  pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef);
wafwerar's avatar
wafwerar 已提交
3065
//  taosMemoryFreeClear(cond.colList);
3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103
//
//  // current table, only one table
//  STableCheckInfo* pCurrent = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
//
//  SArray* psTable = NULL;
//  pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pCurrent, pSecQueryHandle->window.skey, &psTable);
//  if (pSecQueryHandle->pTableCheckInfo == NULL) {
//    taosArrayDestroy(psTable);
//    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
//    goto out_of_memory;
//  }
//
//
//  tsdbMayTakeMemSnapshot(pSecQueryHandle, psTable);
//  if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
//    // no result in current query, free the corresponding result rows structure
//    if (type == TSDB_PREV_ROW) {
//      pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
//    } else {
//      pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
//    }
//
//    goto out_of_memory;
//  }
//
//  SDataBlockInfo blockInfo = {{0}, 0};
//  tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
//  tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
//
//  row = (type == TSDB_PREV_ROW)? pTsdbReadHandle->prev:pTsdbReadHandle->next;
//  int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
//
//  for (int32_t i = 0; i < numOfCols; ++i) {
//    SColumnInfoData* pCol = taosArrayGet(row, i);
//    SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
//    memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
//  }
//
dengyihao's avatar
dengyihao 已提交
3104
// out_of_memory:
3105
//  tsdbCleanupReadHandle(pSecQueryHandle);
3106 3107 3108
//  return terrno;
//}

H
Haojun Liao 已提交
3109
bool tsdbGetExternalRow(tsdbReaderT pHandle) {
dengyihao's avatar
dengyihao 已提交
3110 3111
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle;
  SQueryFilePos*   cur = &pTsdbReadHandle->cur;
H
Haojun Liao 已提交
3112

H
Haojun Liao 已提交
3113 3114
  cur->fid = INT32_MIN;
  cur->mixBlock = true;
3115
  if (pTsdbReadHandle->prev == NULL || pTsdbReadHandle->next == NULL) {
H
Haojun Liao 已提交
3116 3117
    cur->rows = 0;
    return false;
H
Haojun Liao 已提交
3118 3119
  }

dengyihao's avatar
dengyihao 已提交
3120
  int32_t numOfCols = (int32_t)QH_GET_NUM_OF_COLS(pTsdbReadHandle);
H
Haojun Liao 已提交
3121
  for (int32_t i = 0; i < numOfCols; ++i) {
3122 3123
    SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, i);
    SColumnInfoData* first = taosArrayGet(pTsdbReadHandle->prev, i);
H
Haojun Liao 已提交
3124 3125 3126

    memcpy(pColInfoData->pData, first->pData, pColInfoData->info.bytes);

3127
    SColumnInfoData* sec = taosArrayGet(pTsdbReadHandle->next, i);
sangshuduo's avatar
sangshuduo 已提交
3128
    memcpy(((char*)pColInfoData->pData) + pColInfoData->info.bytes, sec->pData, pColInfoData->info.bytes);
H
Haojun Liao 已提交
3129 3130

    if (i == 0 && pColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
3131
      cur->win.skey = *(TSKEY*)pColInfoData->pData;
sangshuduo's avatar
sangshuduo 已提交
3132
      cur->win.ekey = *(TSKEY*)(((char*)pColInfoData->pData) + TSDB_KEYSIZE);
H
Haojun Liao 已提交
3133 3134 3135
    }
  }

H
Haojun Liao 已提交
3136 3137
  cur->rows = 2;
  return true;
3138 3139
}

3140
/*
3141
 * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
3142
 * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
3143
 */
H
Haojun Liao 已提交
3144
// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) {
3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160
//  int32_t code = TSDB_CODE_SUCCESS;
//
//  TSDB_RLOCK_TABLE(pTable);
//
//  if (!pTable->lastRow) {
//    code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW;
//    goto out;
//  }
//
//  if (pRes) {
//    *pRes = tdMemRowDup(pTable->lastRow);
//    if (*pRes == NULL) {
//      code = TSDB_CODE_TDB_OUT_OF_MEMORY;
//    }
//  }
//
H
Haojun Liao 已提交
3161
// out:
3162 3163 3164 3165
//  TSDB_RUNLOCK_TABLE(pTable);
//  return code;
//}

3166
bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
dengyihao's avatar
dengyihao 已提交
3167
  return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
D
fix bug  
dapan1121 已提交
3168 3169
}

dengyihao's avatar
dengyihao 已提交
3170
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) {
3171 3172
  assert(pTsdbReadHandle != NULL && groupList != NULL);

dengyihao's avatar
dengyihao 已提交
3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196
  //  TSKEY    key = TSKEY_INITIAL_VAL;
  //
  //  SArray* group = taosArrayGetP(groupList->pGroupList, 0);
  //  assert(group != NULL);
  //
  //  STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
  //
  //  int32_t code = 0;
  //
  //  if (((STable*)pInfo->pTable)->lastRow) {
  //    code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
  //    if (code != TSDB_CODE_SUCCESS) {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
  //    } else {
  //      pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
  //    }
  //  }
  //
  //  // update the tsdb query time range
  //  if (pTsdbReadHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
  //    pTsdbReadHandle->window      = TSWINDOW_INITIALIZER;
  //    pTsdbReadHandle->checkFiles  = false;
  //    pTsdbReadHandle->activeIndex = -1;  // start from -1
  //  }
H
Haojun Liao 已提交
3197

3198
  return TSDB_CODE_SUCCESS;
3199 3200
}

3201 3202
int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
  assert(pTsdbReadHandle != NULL);
D
update  
dapan1121 已提交
3203 3204

  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
3205 3206 3207
  //  if (pTsdbReadHandle->pTsdb && atomic_load_8(&pTsdbReadHandle->pTsdb->hasCachedLastColumn)){
  //    pTsdbReadHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
  //  }
D
update  
dapan1121 已提交
3208 3209

  // update the tsdb query time range
3210
  if (pTsdbReadHandle->cachelastrow) {
dengyihao's avatar
dengyihao 已提交
3211
    pTsdbReadHandle->checkFiles = false;
3212
    pTsdbReadHandle->activeIndex = -1;  // start from -1
D
update  
dapan1121 已提交
3213 3214 3215 3216 3217
  }

  return code;
}

dengyihao's avatar
dengyihao 已提交
3218
STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) {
H
Haojun Liao 已提交
3219
  STimeWindow window = {INT64_MAX, INT64_MIN};
H
Haojun Liao 已提交
3220

H
Haojun Liao 已提交
3221
  int32_t totalNumOfTable = 0;
3222
  SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
H
Haojun Liao 已提交
3223

H
Haojun Liao 已提交
3224 3225
  // NOTE: starts from the buffer in case of descending timestamp order check data blocks
  size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
dengyihao's avatar
dengyihao 已提交
3226
  for (int32_t j = 0; j < numOfGroups; ++j) {
H
Haojun Liao 已提交
3227 3228
    SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
    TSKEY   key = TSKEY_INITIAL_VAL;
H
Haojun Liao 已提交
3229

H
Haojun Liao 已提交
3230
    STableKeyInfo keyInfo = {0};
H
Haojun Liao 已提交
3231

H
Haojun Liao 已提交
3232
    size_t numOfTables = taosArrayGetSize(pGroup);
dengyihao's avatar
dengyihao 已提交
3233 3234
    for (int32_t i = 0; i < numOfTables; ++i) {
      STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
H
Haojun Liao 已提交
3235

H
Haojun Liao 已提交
3236
      // if the lastKey equals to INT64_MIN, there is no data in this table
dengyihao's avatar
dengyihao 已提交
3237
      TSKEY lastKey = 0;  //((STable*)(pInfo->pTable))->lastKey;
H
Haojun Liao 已提交
3238 3239
      if (key < lastKey) {
        key = lastKey;
H
Haojun Liao 已提交
3240

dengyihao's avatar
dengyihao 已提交
3241
        //        keyInfo.pTable  = pInfo->pTable;
H
Haojun Liao 已提交
3242
        keyInfo.lastKey = key;
dengyihao's avatar
dengyihao 已提交
3243
        pInfo->lastKey = key;
H
Haojun Liao 已提交
3244

H
Haojun Liao 已提交
3245 3246 3247
        if (key < window.skey) {
          window.skey = key;
        }
3248

H
Haojun Liao 已提交
3249 3250 3251 3252
        if (key > window.ekey) {
          window.ekey = key;
        }
      }
3253
    }
H
Haojun Liao 已提交
3254

H
Haojun Liao 已提交
3255
    // more than one table in each group, only one table left for each group
dengyihao's avatar
dengyihao 已提交
3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267
    //    if (keyInfo.pTable != NULL) {
    //      totalNumOfTable++;
    //      if (taosArrayGetSize(pGroup) == 1) {
    //        // do nothing
    //      } else {
    //        taosArrayClear(pGroup);
    //        taosArrayPush(pGroup, &keyInfo);
    //      }
    //    } else {  // mark all the empty groups, and remove it later
    //      taosArrayDestroy(pGroup);
    //      taosArrayPush(emptyGroup, &j);
    //    }
3268
  }
H
Haojun Liao 已提交
3269

H
Haojun Liao 已提交
3270 3271 3272
  // window does not being updated, so set the original
  if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
    window = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
3273
    assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
H
Haojun Liao 已提交
3274 3275
  }

dengyihao's avatar
dengyihao 已提交
3276
  taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
3277 3278
  taosArrayDestroy(emptyGroup);

H
Haojun Liao 已提交
3279
  groupList->numOfTables = totalNumOfTable;
H
Haojun Liao 已提交
3280
  return window;
H
hjxilinx 已提交
3281 3282
}

H
Haojun Liao 已提交
3283
void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDataBlockInfo) {
3284
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
dengyihao's avatar
dengyihao 已提交
3285
  SQueryFilePos*   cur = &pHandle->cur;
3286 3287

  uint64_t uid = 0;
H
Haojun Liao 已提交
3288

3289
  // there are data in file
D
dapan1121 已提交
3290
  if (pHandle->cur.fid != INT32_MIN) {
3291
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
3292
    uid = pBlockInfo->pTableCheckInfo->tableId;
H
[td-32]  
hjxilinx 已提交
3293
  } else {
3294
    STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
3295
    uid = pCheckInfo->tableId;
3296
  }
3297

dengyihao's avatar
dengyihao 已提交
3298 3299
  tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
            cur->win.skey, cur->win.ekey, pHandle->idStr);
3300

3301
  pDataBlockInfo->uid = uid;
3302 3303 3304 3305 3306 3307

#if 0
  // for multi-group data query processing test purpose
  pDataBlockInfo->groupId = uid;
#endif

dengyihao's avatar
dengyihao 已提交
3308
  pDataBlockInfo->rows = cur->rows;
H
Haojun Liao 已提交
3309
  pDataBlockInfo->window = cur->win;
S
TD-1057  
Shengliang Guan 已提交
3310
  pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
3311
}
H
hjxilinx 已提交
3312

H
Haojun Liao 已提交
3313 3314 3315
/*
 * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
 */
3316
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
dengyihao's avatar
dengyihao 已提交
3317
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
3318
  *allHave = false;
H
Haojun Liao 已提交
3319

H
Haojun Liao 已提交
3320 3321
  SQueryFilePos* c = &pHandle->cur;
  if (c->mixBlock) {
H
Haojun Liao 已提交
3322 3323 3324
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3325

H
Haojun Liao 已提交
3326 3327 3328 3329
  STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
  assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));

  // file block with sub-blocks has no statistics data
H
Haojun Liao 已提交
3330 3331 3332 3333
  if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
3334 3335

  int64_t stime = taosGetTimestampUs();
3336 3337
  int     statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
  if (statisStatus < TSDB_STATIS_OK) {
H
Hongze Cheng 已提交
3338
    return terrno;
3339 3340 3341
  } else if (statisStatus > TSDB_STATIS_OK) {
    *pBlockStatis = NULL;
    return TSDB_CODE_SUCCESS;
H
Hongze Cheng 已提交
3342
  }
H
Haojun Liao 已提交
3343

C
Cary Xu 已提交
3344 3345 3346
  tsdbDebug("vgId:%d succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
            TSDB_READ_TABLE_UID(&pHandle->rhelper));

3347
  int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
H
Haojun Liao 已提交
3348

H
Haojun Liao 已提交
3349
  size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
3350 3351 3352
  memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
  memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));

dengyihao's avatar
dengyihao 已提交
3353
  for (int32_t i = 0; i < numOfCols; ++i) {
3354
    pHandle->suppInfo.pstatis[i].colId = colIds[i];
3355
  }
H
Haojun Liao 已提交
3356

3357 3358
  *allHave = true;
  tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
H
Haojun Liao 已提交
3359 3360

  // always load the first primary timestamp column data
3361
  SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
3362
  assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Haojun Liao 已提交
3363 3364 3365 3366

  pPrimaryColStatis->numOfNull = 0;
  pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
  pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
3367
  pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
H
Haojun Liao 已提交
3368

dengyihao's avatar
dengyihao 已提交
3369
  // update the number of NULL data rows
3370
  int32_t* slotIds = pHandle->suppInfo.slotIds;
dengyihao's avatar
dengyihao 已提交
3371
  for (int32_t i = 1; i < numOfCols; ++i) {
3372
    ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
C
Cary Xu 已提交
3373
    if (IS_BSMA_ON(&(pHandle->pSchema->columns[slotIds[i]]))) {
3374 3375 3376 3377 3378 3379 3380
      if (pHandle->suppInfo.pstatis[i].numOfNull == -1) {  // set the column data are all NULL
        pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
      } else {
        pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
      }
    } else {
      *allHave = false;
H
Haojun Liao 已提交
3381 3382
    }
  }
H
Haojun Liao 已提交
3383 3384 3385 3386

  int64_t elapsed = taosGetTimestampUs() - stime;
  pHandle->cost.statisInfoLoadTime += elapsed;

3387
  *pBlockStatis = pHandle->suppInfo.plist;
3388
  return TSDB_CODE_SUCCESS;
H
hjxilinx 已提交
3389 3390
}

H
Haojun Liao 已提交
3391
SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
H
[td-32]  
hjxilinx 已提交
3392
  /**
H
hjxilinx 已提交
3393
   * In the following two cases, the data has been loaded to SColumnInfoData.
H
[td-32]  
hjxilinx 已提交
3394 3395
   * 1. data is from cache, 2. data block is not completed qualified to query time range
   */
3396
  STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
D
dapan1121 已提交
3397
  if (pHandle->cur.fid == INT32_MIN) {
H
[td-32]  
hjxilinx 已提交
3398 3399
    return pHandle->pColumns;
  } else {
H
Haojun Liao 已提交
3400 3401
    STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
    STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
3402

3403
    if (pHandle->cur.mixBlock) {
H
[td-32]  
hjxilinx 已提交
3404 3405
      return pHandle->pColumns;
    } else {
H
Haojun Liao 已提交
3406
      SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
3407
      assert(pHandle->realNumOfRows <= binfo.rows);
H
Haojun Liao 已提交
3408

H
hjxilinx 已提交
3409 3410
      // data block has been loaded, todo extract method
      SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
H
Haojun Liao 已提交
3411

H
Hongze Cheng 已提交
3412
      if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
H
Haojun Liao 已提交
3413
          pBlockLoadInfo->uid == pCheckInfo->tableId) {
H
hjxilinx 已提交
3414
        return pHandle->pColumns;
H
Haojun Liao 已提交
3415
      } else {  // only load the file block
H
refact  
Hongze Cheng 已提交
3416
        SBlock* pBlock = pBlockInfo->compBlock;
H
Haojun Liao 已提交
3417
        if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
3418 3419
          return NULL;
        }
H
Haojun Liao 已提交
3420

H
Haojun Liao 已提交
3421
        int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
H
hjxilinx 已提交
3422 3423
        return pHandle->pColumns;
      }
H
[td-32]  
hjxilinx 已提交
3424 3425
    }
  }
H
hjxilinx 已提交
3426
}
3427
#if 0
3428
void filterPrepare(void* expr, void* param) {
3429
  tExprNode* pExpr = (tExprNode*)expr;
H
[td-32]  
hjxilinx 已提交
3430
  if (pExpr->_node.info != NULL) {
3431 3432
    return;
  }
3433

wafwerar's avatar
wafwerar 已提交
3434
  pExpr->_node.info = taosMemoryCalloc(1, sizeof(tQueryInfo));
H
Haojun Liao 已提交
3435

3436
  STSchema*   pTSSchema = (STSchema*) param;
H
hjxilinx 已提交
3437 3438 3439
  tQueryInfo* pInfo = pExpr->_node.info;
  tVariant*   pCond = pExpr->_node.pRight->pVal;
  SSchema*    pSchema = pExpr->_node.pLeft->pSchema;
3440

3441 3442
  pInfo->sch      = *pSchema;
  pInfo->optr     = pExpr->_node.optr;
Y
yihaoDeng 已提交
3443
  pInfo->compare  = getComparFunc(pInfo->sch.type, pInfo->optr);
H
Haojun Liao 已提交
3444
  pInfo->indexed  = pTSSchema->columns->colId == pInfo->sch.colId;
H
Haojun Liao 已提交
3445

H
hjxilinx 已提交
3446
  if (pInfo->optr == TSDB_RELATION_IN) {
Y
yihaoDeng 已提交
3447
     int dummy = -1;
3448
     SHashObj *pObj = NULL;
Y
yihaoDeng 已提交
3449 3450 3451 3452
     if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
        pObj = taosHashInit(256, taosGetDefaultHashFunction(pInfo->sch.type), true, false);
        SArray *arr = (SArray *)(pCond->arr);
        for (size_t i = 0; i < taosArrayGetSize(arr); i++) {
Y
yihaoDeng 已提交
3453
          char* p = taosArrayGetP(arr, i);
3454 3455
          strntolower_s(varDataVal(p), varDataVal(p), varDataLen(p));
          taosHashPut(pObj, varDataVal(p), varDataLen(p), &dummy, sizeof(dummy));
Y
yihaoDeng 已提交
3456 3457 3458 3459
        }
     } else {
       buildFilterSetFromBinary((void **)&pObj, pCond->pz, pCond->nLen);
     }
3460
     pInfo->q = (char *)pObj;
H
Haojun Liao 已提交
3461
  } else if (pCond != NULL) {
3462 3463 3464 3465
    uint32_t size = pCond->nLen * TSDB_NCHAR_SIZE;
    if (size < (uint32_t)pSchema->bytes) {
      size = pSchema->bytes;
    }
wafwerar's avatar
wafwerar 已提交
3466
    // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
wafwerar's avatar
wafwerar 已提交
3467
    pInfo->q = taosMemoryCalloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
3468
    tVariantDump(pCond, pInfo->q, pSchema->type, true);
weixin_48148422's avatar
weixin_48148422 已提交
3469
  }
3470 3471
}

3472
#endif
3473

dengyihao's avatar
dengyihao 已提交
3474
static int32_t tableGroupComparFn(const void* p1, const void* p2, const void* param) {
3475
#if 0
3476
  STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
3477 3478
  STable* pTable1 = ((STableKeyInfo*) p1)->uid;
  STable* pTable2 = ((STableKeyInfo*) p2)->uid;
H
Haojun Liao 已提交
3479

3480 3481 3482
  for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
    SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
    int32_t colIndex = pColIndex->colIndex;
H
Haojun Liao 已提交
3483

H
Haojun Liao 已提交
3484
    assert(colIndex >= TSDB_TBNAME_COLUMN_INDEX);
H
Haojun Liao 已提交
3485

3486 3487 3488 3489
    char *  f1 = NULL;
    char *  f2 = NULL;
    int32_t type = 0;
    int32_t bytes = 0;
H
Haojun Liao 已提交
3490

H
Haojun Liao 已提交
3491 3492 3493
    if (colIndex == TSDB_TBNAME_COLUMN_INDEX) {
      f1 = (char*) TABLE_NAME(pTable1);
      f2 = (char*) TABLE_NAME(pTable2);
3494
      type = TSDB_DATA_TYPE_BINARY;
3495
      bytes = tGetTbnameColumnSchema()->bytes;
3496
    } else {
Y
yihaoDeng 已提交
3497 3498 3499 3500 3501 3502 3503
      if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
        STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
        bytes = pCol->bytes;
        type = pCol->type;
        f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
        f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
      } 
3504
    }
H
Haojun Liao 已提交
3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518

    // this tags value may be NULL
    if (f1 == NULL && f2 == NULL) {
      continue;
    }

    if (f1 == NULL) {
      return -1;
    }

    if (f2 == NULL) {
      return 1;
    }

3519 3520 3521 3522 3523 3524 3525
    int32_t ret = doCompare(f1, f2, type, bytes);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
3526
#endif
3527 3528 3529
  return 0;
}

H
Haojun Liao 已提交
3530
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
3531
  if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3532
    return -1;
3533
  } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
H
Haojun Liao 已提交
3534 3535 3536 3537 3538 3539 3540 3541 3542
    return 1;
  } else {
    ASSERT(false);
    return 0;
  }
}

void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
                          STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
3543
  STable* pTable = taosArrayGetP(pTableList, 0);
H
Haojun Liao 已提交
3544 3545
  SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));

3546
  STableKeyInfo info = {.lastKey = skey};
H
Haojun Liao 已提交
3547
  taosArrayPush(g, &info);
3548

3549
  for (int32_t i = 1; i < numOfTables; ++i) {
3550 3551
    STable** prev = taosArrayGet(pTableList, i - 1);
    STable** p = taosArrayGet(pTableList, i);
H
Haojun Liao 已提交
3552

H
hjxilinx 已提交
3553
    int32_t ret = compareFn(prev, p, pSupp);
3554
    assert(ret == 0 || ret == -1);
H
Haojun Liao 已提交
3555

3556
    if (ret == 0) {
3557
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3558
      taosArrayPush(g, &info1);
3559 3560
    } else {
      taosArrayPush(pGroups, &g);  // current group is ended, start a new group
H
Haojun Liao 已提交
3561 3562
      g = taosArrayInit(16, sizeof(STableKeyInfo));

3563
      STableKeyInfo info1 = {.lastKey = skey};
H
Haojun Liao 已提交
3564
      taosArrayPush(g, &info1);
3565 3566
    }
  }
H
Haojun Liao 已提交
3567

3568
  taosArrayPush(pGroups, &g);
3569 3570
}

dengyihao's avatar
dengyihao 已提交
3571 3572
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
                         TSKEY skey) {
3573
  assert(pTableList != NULL);
3574
  SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3575

3576 3577
  size_t size = taosArrayGetSize(pTableList);
  if (size == 0) {
S
Shengliang Guan 已提交
3578
    tsdbDebug("no qualified tables");
3579 3580
    return pTableGroup;
  }
H
Haojun Liao 已提交
3581

dengyihao's avatar
dengyihao 已提交
3582
  if (numOfOrderCols == 0 || size == 1) {  // no group by tags clause or only one table
3583
    SArray* sa = taosArrayDup(pTableList);
H
Haojun Liao 已提交
3584 3585 3586 3587
    if (sa == NULL) {
      taosArrayDestroy(pTableGroup);
      return NULL;
    }
H
Haojun Liao 已提交
3588

3589
    taosArrayPush(pTableGroup, &sa);
S
TD-1057  
Shengliang Guan 已提交
3590
    tsdbDebug("all %" PRIzu " tables belong to one group", size);
3591
  } else {
H
Haojun Liao 已提交
3592 3593
    STableGroupSupporter sup = {0};
    sup.numOfCols = numOfOrderCols;
3594
    sup.pTagSchema = pTagSchema->pSchema;
H
Haojun Liao 已提交
3595 3596
    sup.pCols = pCols;

3597 3598
    taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
    createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
3599
  }
H
Haojun Liao 已提交
3600

3601 3602 3603
  return pTableGroup;
}

dengyihao's avatar
dengyihao 已提交
3604
// static bool tableFilterFp(const void* pNode, void* param) {
3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687
//  tQueryInfo* pInfo = (tQueryInfo*) param;
//
//  STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
//
//  char* val = NULL;
//  if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
//    val = (char*) TABLE_NAME(pTable);
//  } else {
//    val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
//  }
//
//  if (pInfo->optr == TSDB_RELATION_ISNULL || pInfo->optr == TSDB_RELATION_NOTNULL) {
//    if (pInfo->optr == TSDB_RELATION_ISNULL) {
//      return (val == NULL) || isNull(val, pInfo->sch.type);
//    } else if (pInfo->optr == TSDB_RELATION_NOTNULL) {
//      return (val != NULL) && (!isNull(val, pInfo->sch.type));
//    }
//  } else if (pInfo->optr == TSDB_RELATION_IN) {
//     int type = pInfo->sch.type;
//     if (type == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_TIMESTAMP) {
//       int64_t v;
//       GET_TYPED_DATA(v, int64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
//       uint64_t v;
//       GET_TYPED_DATA(v, uint64_t, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     }
//     else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
//       double v;
//       GET_TYPED_DATA(v, double, pInfo->sch.type, val);
//       return NULL != taosHashGet((SHashObj *)pInfo->q, (char *)&v, sizeof(v));
//     } else if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR){
//       return NULL != taosHashGet((SHashObj *)pInfo->q, varDataVal(val), varDataLen(val));
//     }
//
//  }
//
//  int32_t ret = 0;
//  if (val == NULL) { //the val is possible to be null, so check it out carefully
//    ret = -1; // val is missing in table tags value pairs
//  } else {
//    ret = pInfo->compare(val, pInfo->q);
//  }
//
//  switch (pInfo->optr) {
//    case TSDB_RELATION_EQUAL: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NOT_EQUAL: {
//      return ret != 0;
//    }
//    case TSDB_RELATION_GREATER_EQUAL: {
//      return ret >= 0;
//    }
//    case TSDB_RELATION_GREATER: {
//      return ret > 0;
//    }
//    case TSDB_RELATION_LESS_EQUAL: {
//      return ret <= 0;
//    }
//    case TSDB_RELATION_LESS: {
//      return ret < 0;
//    }
//    case TSDB_RELATION_LIKE: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_MATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_NMATCH: {
//      return ret == 0;
//    }
//    case TSDB_RELATION_IN: {
//      return ret == 1;
//    }
//
//    default:
//      assert(false);
//  }
//
//  return true;
//}
H
Haojun Liao 已提交
3688

dengyihao's avatar
dengyihao 已提交
3689 3690
// static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp
// *param);
3691

dengyihao's avatar
dengyihao 已提交
3692 3693
// static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
//  //  // query according to the expression tree
3694
//  SExprTraverseSupp supp = {
dengyihao's avatar
dengyihao 已提交
3695
//      .nodeFilterFn = (__result_filter_fn_t)tableFilterFp,
3696 3697
//      .setupInfoFn = filterPrepare,
//      .pExtInfo = pSTable->tagSchema,
dengyihao's avatar
dengyihao 已提交
3698
//  };
3699 3700 3701 3702 3703
//
//  getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
//  tExprTreeDestroy(pExpr, destroyHelper);
//  return TSDB_CODE_SUCCESS;
//}
3704

H
Haojun Liao 已提交
3705
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
3706
                                 int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
3707
                                 SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
H
Hongze Cheng 已提交
3708 3709
  SMetaReader mr = {0};

H
Hongze Cheng 已提交
3710
  metaReaderInit(&mr, (SMeta*)pMeta, 0);
H
Hongze Cheng 已提交
3711 3712

  if (metaGetTableEntryByUid(&mr, uid) < 0) {
dengyihao's avatar
dengyihao 已提交
3713
    tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
3714 3715
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
C
Cary Xu 已提交
3716 3717
  } else {
    tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
3718
  }
H
Haojun Liao 已提交
3719

H
Hongze Cheng 已提交
3720
  if (mr.me.type != TSDB_SUPER_TABLE) {
dengyihao's avatar
dengyihao 已提交
3721 3722 3723
    tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
              reqId);
    terrno = TSDB_CODE_OPS_NOT_SUPPORT;  // basically, this error is caused by invalid sql issued by client
3724
    goto _error;
H
hjxilinx 已提交
3725
  }
3726

H
Hongze Cheng 已提交
3727 3728
  metaReaderClear(&mr);

dengyihao's avatar
dengyihao 已提交
3729 3730
  // NOTE: not add ref count for super table
  SArray*         res = taosArrayInit(8, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
3731
  SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
H
Haojun Liao 已提交
3732

weixin_48148422's avatar
weixin_48148422 已提交
3733 3734
  // no tags and tbname condition, all child tables of this stable are involved
  if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
H
Haojun Liao 已提交
3735
    int32_t ret = getAllTableList(pMeta, uid, res);
3736 3737
    if (ret != TSDB_CODE_SUCCESS) {
      goto _error;
3738
    }
3739

dengyihao's avatar
dengyihao 已提交
3740 3741
    pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
    pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
H
Haojun Liao 已提交
3742

dengyihao's avatar
dengyihao 已提交
3743 3744 3745
    tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64
              " QID:0x%" PRIx64,
              pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
3746

3747
    taosArrayDestroy(res);
3748 3749
    return ret;
  }
3750

H
hjxilinx 已提交
3751
  int32_t ret = TSDB_CODE_SUCCESS;
3752

dengyihao's avatar
dengyihao 已提交
3753 3754
  SFilterInfo* filterInfo = NULL;
  ret = filterInitFromNode((SNode*)pTagCond, &filterInfo, 0);
dengyihao's avatar
dengyihao 已提交
3755 3756 3757 3758 3759 3760 3761 3762
  if (ret != TSDB_CODE_SUCCESS) {
    terrno = ret;
    return ret;
  }
  ret = tsdbQueryTableList(pMeta, res, filterInfo);
  pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
  pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);

dengyihao's avatar
dengyihao 已提交
3763 3764
  // tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb,
  //          pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
dengyihao's avatar
dengyihao 已提交
3765 3766 3767 3768 3769

  taosArrayDestroy(res);
  return ret;

_error:
3770
  return terrno;
3771
}
3772

dengyihao's avatar
dengyihao 已提交
3773 3774 3775 3776 3777
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
  // impl later

  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
3778
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
H
Hongze Cheng 已提交
3779 3780
  SMetaReader mr = {0};

H
Hongze Cheng 已提交
3781
  metaReaderInit(&mr, (SMeta*)pMeta, 0);
H
Hongze Cheng 已提交
3782 3783

  if (metaGetTableEntryByUid(&mr, uid) < 0) {
3784 3785
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    goto _error;
3786
  }
3787

H
Hongze Cheng 已提交
3788 3789
  metaReaderClear(&mr);

3790 3791
  pGroupInfo->numOfTables = 1;
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3792

H
Haojun Liao 已提交
3793 3794
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));

3795
  STableKeyInfo info = {.lastKey = startKey, .uid = uid};
H
Haojun Liao 已提交
3796
  taosArrayPush(group, &info);
H
Haojun Liao 已提交
3797

3798
  taosArrayPush(pGroupInfo->pGroupList, &group);
3799
  return TSDB_CODE_SUCCESS;
3800

dengyihao's avatar
dengyihao 已提交
3801
_error:
H
Hongze Cheng 已提交
3802
  metaReaderClear(&mr);
3803
  return terrno;
3804
}
3805

3806
#if 0
3807
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
B
Bomin Zhang 已提交
3808 3809 3810
  if (tsdbRLockRepoMeta(tsdb) < 0) {
    return terrno;
  }
3811 3812 3813 3814

  assert(pTableIdList != NULL);
  size_t size = taosArrayGetSize(pTableIdList);
  pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
3815
  SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
3816

B
Bomin Zhang 已提交
3817
  for(int32_t i = 0; i < size; ++i) {
3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828
    STableIdInfo *id = taosArrayGet(pTableIdList, i);

    STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
    if (pTable == NULL) {
      tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
      continue;
    }

    if (pTable->type == TSDB_SUPER_TABLE) {
      tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
      terrno = TSDB_CODE_QRY_INVALID_MSG;
D
fix bug  
dapan1121 已提交
3829 3830 3831
      tsdbUnlockRepoMeta(tsdb);
      taosArrayDestroy(group);
      return terrno;
3832 3833
    }

H
Haojun Liao 已提交
3834 3835
    STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
    taosArrayPush(group, &info);
3836 3837
  }

B
Bomin Zhang 已提交
3838 3839 3840 3841
  if (tsdbUnlockRepoMeta(tsdb) < 0) {
    taosArrayDestroy(group);
    return terrno;
  }
3842

sangshuduo's avatar
sangshuduo 已提交
3843
  pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
B
Bomin Zhang 已提交
3844 3845 3846 3847 3848
  if (pGroupInfo->numOfTables > 0) {
    taosArrayPush(pGroupInfo->pGroupList, &group);
  } else {
    taosArrayDestroy(group);
  }
3849 3850 3851

  return TSDB_CODE_SUCCESS;
}
3852
#endif
3853 3854 3855 3856 3857 3858 3859 3860
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
  if (pColumnInfoData == NULL) {
    return NULL;
  }

  size_t cols = taosArrayGetSize(pColumnInfoData);
  for (int32_t i = 0; i < cols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
wafwerar's avatar
wafwerar 已提交
3861
    taosMemoryFreeClear(pColInfo->pData);
3862 3863 3864 3865 3866 3867
  }

  taosArrayDestroy(pColumnInfoData);
  return NULL;
}

H
Haojun Liao 已提交
3868 3869 3870 3871 3872 3873
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
  size_t size = taosArrayGetSize(pTableCheckInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
    destroyTableMemIterator(p);

wafwerar's avatar
wafwerar 已提交
3874
    taosMemoryFreeClear(p->pCompInfo);
H
Haojun Liao 已提交
3875 3876 3877 3878 3879 3880
  }

  taosArrayDestroy(pTableCheckInfo);
  return NULL;
}

H
Haojun Liao 已提交
3881
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
3882 3883
  STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
  if (pTsdbReadHandle == NULL) {
3884 3885
    return;
  }
3886

3887
  pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
3888

3889
  taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
wafwerar's avatar
wafwerar 已提交
3890
  taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
3891 3892
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
  taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
3893

3894
  if (!emptyQueryTimewindow(pTsdbReadHandle)) {
dengyihao's avatar
dengyihao 已提交
3895
    //    tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
3896
  } else {
3897
    assert(pTsdbReadHandle->pTableCheckInfo == NULL);
3898 3899
  }

3900 3901
  if (pTsdbReadHandle->pTableCheckInfo != NULL) {
    pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
3902
  }
3903

3904
  tsdbDestroyReadH(&pTsdbReadHandle->rhelper);
H
Haojun Liao 已提交
3905

3906 3907
  tdFreeDataCols(pTsdbReadHandle->pDataCols);
  pTsdbReadHandle->pDataCols = NULL;
H
Haojun Liao 已提交
3908

3909 3910
  pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
  pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
3911

3912
  SIOCostSummary* pCost = &pTsdbReadHandle->cost;
3913

dengyihao's avatar
dengyihao 已提交
3914 3915 3916 3917
  tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
            " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
            pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
            pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
H
Haojun Liao 已提交
3918

wafwerar's avatar
wafwerar 已提交
3919
  taosMemoryFreeClear(pTsdbReadHandle);
3920
}
3921

3922
#if 0
H
Haojun Liao 已提交
3923
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
3924 3925 3926 3927 3928 3929 3930 3931 3932 3933
  assert(pGroupList != NULL);

  size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);

  for(int32_t i = 0; i < numOfGroup; ++i) {
    SArray* p = taosArrayGetP(pGroupList->pGroupList, i);

    size_t numOfTables = taosArrayGetSize(p);
    for(int32_t j = 0; j < numOfTables; ++j) {
      STable* pTable = taosArrayGetP(p, j);
3934 3935 3936 3937
      if (pTable != NULL) { // in case of handling retrieve data from tsdb
        tsdbUnRefTable(pTable);
      }
      //assert(pTable != NULL);
3938 3939 3940 3941 3942
    }

    taosArrayDestroy(p);
  }

3943
  taosHashCleanup(pGroupList->map);
3944
  taosArrayDestroy(pGroupList->pGroupList);
H
Haojun Liao 已提交
3945
  pGroupList->numOfTables = 0;
3946
}
H
Haojun Liao 已提交
3947 3948 3949 3950 3951 3952 3953

static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  // Scan each node in the skiplist by using iterator
  while (tSkipListIterNext(iter)) {
    SSkipListNode *pNode = tSkipListIterGet(iter);
H
Haojun Liao 已提交
3954
    if (exprTreeApplyFilter(pExpr, pNode, param)) {
H
Haojun Liao 已提交
3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977
      taosArrayPush(pResult, &(SL_GET_NODE_DATA(pNode)));
    }
  }

  tSkipListDestroyIter(iter);
}

typedef struct {
  char*    v;
  int32_t  optr;
} SEndPoint;

typedef struct {
  SEndPoint* start;
  SEndPoint* end;
} SQueryCond;

// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
  int32_t optr = queryColInfo->optr;

  if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
      optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3978
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3979
    pCond->start->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3980
    pCond->start->v    = queryColInfo->q;
H
Haojun Liao 已提交
3981
  } else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
wafwerar's avatar
wafwerar 已提交
3982
    pCond->end       = taosMemoryCalloc(1, sizeof(SEndPoint));
H
Haojun Liao 已提交
3983
    pCond->end->optr = queryColInfo->optr;
Y
yihaoDeng 已提交
3984 3985
    pCond->end->v    = queryColInfo->q;
  } else if (optr == TSDB_RELATION_IN) {
wafwerar's avatar
wafwerar 已提交
3986
    pCond->start       = taosMemoryCalloc(1, sizeof(SEndPoint));
Y
yihaoDeng 已提交
3987 3988 3989
    pCond->start->optr = queryColInfo->optr;
    pCond->start->v    = queryColInfo->q; 
  } else if (optr == TSDB_RELATION_LIKE) {
H
Haojun Liao 已提交
3990
    assert(0);
3991 3992
  } else if (optr == TSDB_RELATION_MATCH) {
    assert(0);
3993 3994
  } else if (optr == TSDB_RELATION_NMATCH) {
    assert(0);
H
Haojun Liao 已提交
3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077
  }

  return TSDB_CODE_SUCCESS;
}

static void queryIndexedColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
  SSkipListIterator* iter = NULL;

  SQueryCond cond = {0};
  if (setQueryCond(pQueryInfo, &cond) != TSDB_CODE_SUCCESS) {
    //todo handle error
  }

  if (cond.start != NULL) {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_ASC);
  } else {
    iter = tSkipListCreateIterFromVal(pSkipList, (char*)(cond.end ? cond.end->v: NULL), pSkipList->type, TSDB_ORDER_DESC);
  }

  if (cond.start != NULL) {
    int32_t optr = cond.start->optr;

    if (optr == TSDB_RELATION_EQUAL) {   // equals
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
    } else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
      bool comp = true;
      int32_t ret = 0;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
          assert(ret >= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_GREATER) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;
        }
      }
    } else if (optr == TSDB_RELATION_NOT_EQUAL) {   // not equal
      bool comp = true;

      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

      tSkipListDestroyIter(iter);

      comp = true;
      iter = tSkipListCreateIterFromVal(pSkipList, (char*) cond.start->v, pSkipList->type, TSDB_ORDER_DESC);
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);
        comp = comp && (pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v) == 0);
        if (comp) {
          continue;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }

Y
yihaoDeng 已提交
4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090
    } else if (optr == TSDB_RELATION_IN) {
      while(tSkipListIterNext(iter)) {
        SSkipListNode* pNode = tSkipListIterGet(iter);

        int32_t ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.start->v);
        if (ret != 0) {
          break;
        }

        STableKeyInfo info = {.pTable = (void*)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
        taosArrayPush(result, &info);
      }
      
H
Haojun Liao 已提交
4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131
    } else {
      assert(0);
    }
  } else {
    int32_t optr = cond.end ? cond.end->optr : TSDB_RELATION_INVALID;
    if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
      bool    comp = true;
      int32_t ret = 0;

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        if (comp) {
          ret = pQueryInfo->compare(SL_GET_NODE_KEY(pSkipList, pNode), cond.end->v);
          assert(ret <= 0);
        }

        if (ret == 0 && optr == TSDB_RELATION_LESS) {
          continue;
        } else {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
          comp = false;  // no need to compare anymore
        }
      }
    } else {
      assert(pQueryInfo->optr == TSDB_RELATION_ISNULL || pQueryInfo->optr == TSDB_RELATION_NOTNULL);

      while (tSkipListIterNext(iter)) {
        SSkipListNode *pNode = tSkipListIterGet(iter);

        bool isnull = isNull(SL_GET_NODE_KEY(pSkipList, pNode), pQueryInfo->sch.type);
        if ((pQueryInfo->optr == TSDB_RELATION_ISNULL && isnull) ||
            (pQueryInfo->optr == TSDB_RELATION_NOTNULL && (!isnull))) {
          STableKeyInfo info = {.pTable = (void *)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
          taosArrayPush(result, &info);
        }
      }
    }
  }

wafwerar's avatar
wafwerar 已提交
4132 4133
  taosMemoryFree(cond.start);
  taosMemoryFree(cond.end);
H
Haojun Liao 已提交
4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151
  tSkipListDestroyIter(iter);
}

static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
  SSkipListIterator* iter = tSkipListCreateIter(pSkipList);

  while (tSkipListIterNext(iter)) {
    bool addToResult = false;

    SSkipListNode *pNode = tSkipListIterGet(iter);

    char *pData = SL_GET_NODE_DATA(pNode);
    tstr *name = (tstr*) tsdbGetTableName((void*) pData);

    // todo speed up by using hash
    if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
      if (pQueryInfo->optr == TSDB_RELATION_IN) {
        addToResult = pQueryInfo->compare(name, pQueryInfo->q);
4152 4153 4154
      } else if (pQueryInfo->optr == TSDB_RELATION_LIKE ||
                 pQueryInfo->optr == TSDB_RELATION_MATCH ||
                 pQueryInfo->optr == TSDB_RELATION_NMATCH) {
H
Haojun Liao 已提交
4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170
        addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
      }
    } else {
      addToResult = filterFp(pNode, pQueryInfo);
    }

    if (addToResult) {
      STableKeyInfo info = {.pTable = (void*)pData, .lastKey = TSKEY_INITIAL_VAL};
      taosArrayPush(res, &info);
    }
  }

  tSkipListDestroyIter(iter);
}

// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
dengyihao's avatar
dengyihao 已提交
4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203
//void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
//  if (pExpr == NULL) {
//    return;
//  }
//
//  tExprNode *pLeft  = pExpr->_node.pLeft;
//  tExprNode *pRight = pExpr->_node.pRight;
//
//  // column project
//  if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
//    assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
//
//    param->setupInfoFn(pExpr, param->pExtInfo);
//
//    tQueryInfo *pQueryInfo = pExpr->_node.info;
//    if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
//                                && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
//                                && pQueryInfo->optr != TSDB_RELATION_IN)) {
//      queryIndexedColumn(pSkipList, pQueryInfo, result);
//    } else {
//      queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
//    }
//
//    return;
//  }
//
//  // The value of hasPK is always 0.
//  uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
//  assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//
//  //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
//  applyFilterToSkipListNode(pSkipList, pExpr, result, param);
//}
L
Liu Jicong 已提交
4204
#endif